You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/04/24 16:16:04 UTC

[2/3] apex-malhar git commit: Grouping examples and POM and Readme changes.

Grouping examples and POM and Readme changes.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c79def4c
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c79def4c
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c79def4c

Branch: refs/heads/master
Commit: c79def4cd6f2bfb830aa32c9a9e455a7c4eb8385
Parents: 24027ed
Author: Lakshmi Prasanna Velineni <la...@datatorrent.com>
Authored: Wed Mar 29 12:46:59 2017 -0700
Committer: Lakshmi Prasanna Velineni <la...@datatorrent.com>
Committed: Mon Apr 24 09:06:40 2017 -0700

----------------------------------------------------------------------
 examples/hdfs2kafka/README.md                   |   4 -
 .../hdfs2kafka/XmlJavadocCommentsExtractor.xsl  |  44 ---
 examples/hdfs2kafka/pom.xml                     | 315 -------------------
 examples/hdfs2kafka/src/assemble/appPackage.xml |  43 ---
 .../java/com/example/myapexapp/Application.java |  26 --
 .../src/main/resources/META-INF/properties.xml  |  16 -
 .../com/example/myapexapp/ApplicationTest.java  | 132 --------
 .../src/test/resources/log4j.properties         |  22 --
 examples/kafka/README.md                        |  10 +
 examples/kafka/XmlJavadocCommentsExtractor.xsl  |  44 ---
 examples/kafka/pom.xml                          | 305 ++----------------
 .../java/com/example/myapexapp/KafkaApp.java    |  26 --
 .../example/myapexapp/LineOutputOperator.java   |  34 --
 .../examples/kafka/hdfs2kafka/Application.java  |  26 ++
 .../examples/kafka/kafka2hdfs/KafkaApp.java     |  26 ++
 .../kafka/kafka2hdfs/LineOutputOperator.java    |  34 ++
 .../META-INF/properties-hdfs2kafka.xml          |  16 +
 .../com/example/myapexapp/ApplicationTest.java  | 152 ---------
 .../kafka/hdfs2kafka/ApplicationTest.java       | 125 ++++++++
 .../kafka/kafka2hdfs/ApplicationTest.java       | 150 +++++++++
 examples/pom.xml                                |   1 +
 pom.xml                                         |   2 +-
 22 files changed, 420 insertions(+), 1133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/hdfs2kafka/README.md
----------------------------------------------------------------------
diff --git a/examples/hdfs2kafka/README.md b/examples/hdfs2kafka/README.md
deleted file mode 100644
index 166abd3..0000000
--- a/examples/hdfs2kafka/README.md
+++ /dev/null
@@ -1,4 +0,0 @@
-This sample application shows how to read lines from files in HDFS and write
-them out to a Kafka topic. Each line of the input file is considered a separate
-message. The topic name, the name of the directory that is monitored for input
-files, and other parameters are configurable in `META_INF/properties.xml`.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/hdfs2kafka/XmlJavadocCommentsExtractor.xsl
----------------------------------------------------------------------
diff --git a/examples/hdfs2kafka/XmlJavadocCommentsExtractor.xsl b/examples/hdfs2kafka/XmlJavadocCommentsExtractor.xsl
deleted file mode 100644
index 08075a9..0000000
--- a/examples/hdfs2kafka/XmlJavadocCommentsExtractor.xsl
+++ /dev/null
@@ -1,44 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
-    Licensed under the Apache License, Version 2.0 (the "License");
-    you may not use this file except in compliance with the License.
-    You may obtain a copy of the License at
-
-            http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
-
--->
-
-<!--
-    Document   : XmlJavadocCommentsExtractor.xsl
-    Created on : September 16, 2014, 11:30 AM
-    Description:
-        The transformation strips off all information except for comments and tags from xml javadoc generated by xml-doclet.
--->
-
-<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0">
-  <xsl:output method="xml" standalone="yes"/>
-
-  <!-- copy xml by selecting only the following nodes, attributes and text -->
-  <xsl:template match="node()|text()|@*">
-    <xsl:copy>
-      <xsl:apply-templates select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/>
-    </xsl:copy>
-  </xsl:template>
-
-  <!-- Strip off the following paths from the selected xml -->
-  <xsl:template match="//root/package/interface/interface
-                      |//root/package/interface/method/@qualified
-                      |//root/package/class/interface
-                      |//root/package/class/class
-                      |//root/package/class/method/@qualified
-                      |//root/package/class/field/@qualified" />
-
-  <xsl:strip-space elements="*"/>
-</xsl:stylesheet>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/hdfs2kafka/pom.xml
----------------------------------------------------------------------
diff --git a/examples/hdfs2kafka/pom.xml b/examples/hdfs2kafka/pom.xml
deleted file mode 100644
index 75cfb6d..0000000
--- a/examples/hdfs2kafka/pom.xml
+++ /dev/null
@@ -1,315 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  
-  <groupId>com.example</groupId>
-  <version>1.0-SNAPSHOT</version>
-  <artifactId>hdfsToKafka</artifactId>
-  <packaging>jar</packaging>
-
-  <!-- change these to the appropriate values -->
-  <name>HDFS to Kafka</name>
-  <description>Simple application to transfer data from HDFS to Kafka</description>
-
-  <properties>
-    <!-- change this if you desire to use a different version of Apex Core -->
-    <apex.version>3.5.0</apex.version>
-    <malhar.version>3.6.0</malhar.version>
-    <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
-  </properties>
-
-  <build>
-    <plugins>
-       <plugin>
-         <groupId>org.apache.maven.plugins</groupId>
-         <artifactId>maven-eclipse-plugin</artifactId>
-         <version>2.9</version>
-         <configuration>
-           <downloadSources>true</downloadSources>
-         </configuration>
-       </plugin>
-       <plugin>
-         <artifactId>maven-compiler-plugin</artifactId>
-         <version>3.3</version>
-         <configuration>
-           <encoding>UTF-8</encoding>
-           <source>1.7</source>
-           <target>1.7</target>
-           <debug>true</debug>
-           <optimize>false</optimize>
-           <showDeprecation>true</showDeprecation>
-           <showWarnings>true</showWarnings>
-         </configuration>
-       </plugin>
-       <plugin>
-         <artifactId>maven-dependency-plugin</artifactId>
-         <version>2.8</version>
-         <executions>
-           <execution>
-             <id>copy-dependencies</id>
-             <phase>prepare-package</phase>
-             <goals>
-               <goal>copy-dependencies</goal>
-             </goals>
-             <configuration>
-               <outputDirectory>target/deps</outputDirectory>
-               <includeScope>runtime</includeScope>
-             </configuration>
-           </execution>
-         </executions>
-       </plugin>
-
-       <plugin>
-         <artifactId>maven-assembly-plugin</artifactId>
-         <executions>
-           <execution>
-             <id>app-package-assembly</id>
-             <phase>package</phase>
-             <goals>
-               <goal>single</goal>
-             </goals>
-             <configuration>
-               <finalName>${project.artifactId}-${project.version}-apexapp</finalName>
-               <appendAssemblyId>false</appendAssemblyId>
-               <descriptors>
-                 <descriptor>src/assemble/appPackage.xml</descriptor>
-               </descriptors>
-               <archiverConfig>
-                 <defaultDirectoryMode>0755</defaultDirectoryMode>
-               </archiverConfig>                  
-               <archive>
-                 <manifestEntries>
-                   <Class-Path>${apex.apppackage.classpath}</Class-Path>
-                   <DT-Engine-Version>${apex.version}</DT-Engine-Version>
-                   <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id>
-                   <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name>
-                   <DT-App-Package-Version>${project.version}</DT-App-Package-Version>
-                   <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name>
-                   <DT-App-Package-Description>${project.description}</DT-App-Package-Description>
-                 </manifestEntries>
-               </archive>
-             </configuration>
-           </execution>
-         </executions>
-       </plugin>
-
-       <plugin>
-         <artifactId>maven-antrun-plugin</artifactId>
-         <version>1.7</version>
-         <executions>
-           <execution>
-             <phase>package</phase>
-             <configuration>
-               <target>
-                 <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar"
-                       tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" />
-               </target>
-             </configuration>
-             <goals>
-               <goal>run</goal>
-             </goals>
-           </execution>
-           <execution>
-             <!-- create resource directory for xml javadoc-->
-             <id>createJavadocDirectory</id>
-             <phase>generate-resources</phase>
-             <configuration>
-               <tasks>
-                 <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/>
-                 <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/>
-               </tasks>
-             </configuration>
-             <goals>
-               <goal>run</goal>
-             </goals>
-           </execution>
-         </executions>
-       </plugin>
-
-       <plugin>
-         <groupId>org.codehaus.mojo</groupId>
-         <artifactId>build-helper-maven-plugin</artifactId>
-         <version>1.9.1</version>
-         <executions>
-           <execution>
-             <id>attach-artifacts</id>
-             <phase>package</phase>
-             <goals>
-               <goal>attach-artifact</goal>
-             </goals>
-             <configuration>
-               <artifacts>
-                 <artifact>
-                   <file>target/${project.artifactId}-${project.version}.apa</file>
-                   <type>apa</type>
-                 </artifact>
-               </artifacts>
-               <skipAttach>false</skipAttach>
-             </configuration>
-           </execution>
-         </executions>
-       </plugin>
-
-      <!-- generate javdoc -->
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-javadoc-plugin</artifactId>
-        <executions>
-          <!-- generate xml javadoc -->
-          <execution>
-            <id>xml-doclet</id>
-            <phase>generate-resources</phase>
-            <goals>
-              <goal>javadoc</goal>
-            </goals>
-            <configuration>
-              <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet>
-              <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam>
-              <useStandardDocletOptions>false</useStandardDocletOptions>
-              <docletArtifact>
-                <groupId>com.github.markusbernhardt</groupId>
-                <artifactId>xml-doclet</artifactId>
-                <version>1.0.4</version>
-              </docletArtifact>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-      <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags-->
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>xml-maven-plugin</artifactId>
-        <version>1.0</version>
-        <executions>
-          <execution>
-            <id>transform-xmljavadoc</id>
-            <phase>generate-resources</phase>
-            <goals>
-              <goal>transform</goal>
-            </goals>
-          </execution>
-        </executions>
-        <configuration>
-          <transformationSets>
-            <transformationSet>
-              <dir>${project.build.directory}/generated-resources/xml-javadoc</dir>
-              <includes>
-                <include>${project.artifactId}-${project.version}-javadoc.xml</include>
-              </includes>
-              <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet>
-              <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir>
-            </transformationSet>
-          </transformationSets>
-        </configuration>
-      </plugin>
-      <!-- copy xml javadoc to class jar -->
-      <plugin>
-        <artifactId>maven-resources-plugin</artifactId>
-        <version>2.6</version>
-        <executions>
-          <execution>
-            <id>copy-resources</id>
-            <phase>process-resources</phase>
-            <goals>
-              <goal>copy-resources</goal>
-            </goals>
-            <configuration>
-              <outputDirectory>${basedir}/target/classes</outputDirectory>
-              <resources>
-                <resource>
-                  <directory>${project.build.directory}/generated-resources/xml-javadoc</directory>
-                  <includes>
-                    <include>${project.artifactId}-${project.version}-javadoc.xml</include>
-                  </includes>
-                  <filtering>true</filtering>
-                </resource>
-              </resources>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-
-    </plugins>
-
-  </build>
-
-  <dependencies>
-    <!-- add your dependencies here -->
-    <dependency>
-      <groupId>org.apache.apex</groupId>
-      <artifactId>malhar-library</artifactId>
-      <version>3.4.0</version>
-      <!-- 
-           If you know that your application does not need transitive dependencies pulled in by malhar-library,
-           uncomment the following to reduce the size of your app package.
-      -->
-      <!--    
-      <exclusions>
-        <exclusion>
-          <groupId>*</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-      </exclusions>
-      -->
-    </dependency>
-    <dependency>
-      <groupId>org.apache.apex</groupId>
-      <artifactId>malhar-contrib</artifactId>
-      <version>3.4.0</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.apex</groupId>
-      <artifactId>apex-common</artifactId>
-      <version>${apex.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <version>4.10</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.apex</groupId>
-      <artifactId>apex-engine</artifactId>
-      <version>${apex.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_2.8.2</artifactId>
-      <version>0.8.1</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-log4j12</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-simple</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>log4j</groupId>
-          <artifactId>log4j</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.zookeeper</groupId>
-          <artifactId>zookeeper</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>info.batey.kafka</groupId>
-      <artifactId>kafka-unit</artifactId>
-      <version>0.3</version>
-      <exclusions>
-        <exclusion>
-          <groupId>*</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-  </dependencies>
-
-</project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/hdfs2kafka/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/examples/hdfs2kafka/src/assemble/appPackage.xml b/examples/hdfs2kafka/src/assemble/appPackage.xml
deleted file mode 100644
index 7ad071c..0000000
--- a/examples/hdfs2kafka/src/assemble/appPackage.xml
+++ /dev/null
@@ -1,43 +0,0 @@
-<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
-    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
-  <id>appPackage</id>
-  <formats>
-    <format>jar</format>
-  </formats>
-  <includeBaseDirectory>false</includeBaseDirectory>
-  <fileSets>
-    <fileSet>
-      <directory>${basedir}/target/</directory>
-      <outputDirectory>/app</outputDirectory>
-      <includes>
-        <include>${project.artifactId}-${project.version}.jar</include>
-      </includes>
-    </fileSet>
-    <fileSet>
-      <directory>${basedir}/target/deps</directory>
-      <outputDirectory>/lib</outputDirectory>
-    </fileSet>
-    <fileSet>
-      <directory>${basedir}/src/site/conf</directory>
-      <outputDirectory>/conf</outputDirectory>
-      <includes>
-        <include>*.xml</include>
-      </includes>
-    </fileSet>
-    <fileSet>
-      <directory>${basedir}/src/main/resources/META-INF</directory>
-      <outputDirectory>/META-INF</outputDirectory>
-    </fileSet>
-    <fileSet>
-      <directory>${basedir}/src/main/resources/app</directory>
-      <outputDirectory>/app</outputDirectory>
-    </fileSet>
-    <fileSet>
-      <directory>${basedir}/src/main/resources/resources</directory>
-      <outputDirectory>/resources</outputDirectory>
-    </fileSet>
-  </fileSets>
-
-</assembly>
-

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/hdfs2kafka/src/main/java/com/example/myapexapp/Application.java
----------------------------------------------------------------------
diff --git a/examples/hdfs2kafka/src/main/java/com/example/myapexapp/Application.java b/examples/hdfs2kafka/src/main/java/com/example/myapexapp/Application.java
deleted file mode 100644
index 447ae1c..0000000
--- a/examples/hdfs2kafka/src/main/java/com/example/myapexapp/Application.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package com.example.myapexapp;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.contrib.kafka.KafkaSinglePortOutputOperator;
-import org.apache.apex.malhar.lib.fs.LineByLineFileInputOperator;
-
-@ApplicationAnnotation(name="Hdfs2Kafka")
-public class Application implements StreamingApplication
-{
-
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    LineByLineFileInputOperator in = dag.addOperator("lines",
-                                                     LineByLineFileInputOperator.class);
-
-    KafkaSinglePortOutputOperator<String,String> out = dag.addOperator("kafkaOutput", new KafkaSinglePortOutputOperator<String,String>());
-
-    dag.addStream("data", in.output, out.inputPort).setLocality(Locality.CONTAINER_LOCAL);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/hdfs2kafka/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/hdfs2kafka/src/main/resources/META-INF/properties.xml b/examples/hdfs2kafka/src/main/resources/META-INF/properties.xml
deleted file mode 100644
index 7c624ca..0000000
--- a/examples/hdfs2kafka/src/main/resources/META-INF/properties.xml
+++ /dev/null
@@ -1,16 +0,0 @@
-<?xml version="1.0"?>
-<configuration>
-  <property>
-    <name>dt.operator.kafkaOutput.prop.topic</name>
-    <value>hdfs2kafka</value>
-  </property>
-  <property>
-    <name>dt.operator.lines.prop.directory</name>
-    <value>/tmp/hdfs2kafka</value>
-  </property>
-  <property>
-    <name>dt.operator.kafkaOutput.prop.producerProperties</name>
-    <value>serializer.class=kafka.serializer.StringEncoder,producer.type=async,metadata.broker.list=localhost:9092</value>
-  </property>
-</configuration>
-

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/hdfs2kafka/src/test/java/com/example/myapexapp/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/hdfs2kafka/src/test/java/com/example/myapexapp/ApplicationTest.java b/examples/hdfs2kafka/src/test/java/com/example/myapexapp/ApplicationTest.java
deleted file mode 100644
index 2c415be..0000000
--- a/examples/hdfs2kafka/src/test/java/com/example/myapexapp/ApplicationTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-package com.example.myapexapp;
-
-import java.io.File;
-import java.io.IOException;
-
-import java.util.List;
-import java.util.concurrent.TimeoutException;
-
-import javax.validation.ConstraintViolationException;
-
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-
-import org.junit.Test;
-
-import info.batey.kafka.unit.KafkaUnitRule;
-import info.batey.kafka.unit.KafkaUnit;
-
-import kafka.producer.KeyedMessage;
-
-import com.datatorrent.api.LocalMode;
-import com.example.myapexapp.Application;
-
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test the DAG declaration in local mode.
- */
-public class ApplicationTest {
-  private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
-  private static final String TOPIC = "hdfs2kafka";
-  private static final String directory = "target/hdfs2kafka";
-  private static final String FILE_NAME = "messages.txt";
-
-  private static final int zkPort = 2181;
-  private static final int  brokerPort = 9092;
-  private static final String BROKER = "localhost:" + brokerPort;
-  //private static final String FILE_PATH = FILE_DIR + "/" + FILE_NAME + ".0";     // first part
-
-  // test messages
-  private static String[] lines =
-  {
-    "1st line",
-    "2nd line",
-    "3rd line",
-    "4th line",
-    "5th line",
-  };
-
-  // broker port must match properties.xml
-  @Rule
-  public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort);
-
-
-  @Test
-  public void testApplication() throws IOException, Exception {
-    try {
-      // create file in monitored HDFS directory
-      createFile();
-
-      // run app asynchronously; terminate after results are checked
-      LocalMode.Controller lc = asyncRun();
-
-      // get messages from Kafka topic and compare with input
-      chkOutput();
-
-      lc.shutdown();
-    } catch (ConstraintViolationException e) {
-      Assert.fail("constraint violations: " + e.getConstraintViolations());
-    }
-  }
-
-  // create a file with content from 'lines'
-  private void createFile() throws IOException {
-    // remove old file and create new one
-    File file = new File(directory, FILE_NAME);
-    FileUtils.deleteQuietly(file);
-    try {
-      String data = StringUtils.join(lines, "\n") + "\n";    // add final newline
-      FileUtils.writeStringToFile(file, data, "UTF-8");
-    } catch (IOException e) {
-      LOG.error("Error: Failed to create file {} in {}", FILE_NAME, directory);
-      e.printStackTrace();
-    }
-    LOG.debug("Created file {} with {} lines in {}",
-              FILE_NAME, lines.length, directory);
-  }
-
-  private LocalMode.Controller asyncRun() throws Exception {
-    Configuration conf = getConfig();
-    LocalMode lma = LocalMode.newInstance();
-    lma.prepareDAG(new Application(), conf);
-    LocalMode.Controller lc = lma.getController();
-    lc.runAsync();
-    return lc;
-  }
-
-  private Configuration getConfig() {
-      Configuration conf = new Configuration(false);
-      conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
-      conf.set("dt.operator.lines.prop.directory", directory);
-      return conf;
-  }
-
-  private void chkOutput() throws Exception {
-    KafkaUnit ku = kafkaUnitRule.getKafkaUnit();
-    List<String> messages = null;
-
-    // wait for messages to appear in kafka
-    Thread.sleep(10000);
-
-    try {
-      messages = ku.readMessages(TOPIC, lines.length);
-    } catch (Exception e) {
-      LOG.error("Error: Got exception {}", e);
-    }
-
-    int i = 0;
-    for (String msg : messages) {
-      assertTrue("Error: message mismatch", msg.equals(lines[i]));
-      ++i;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/hdfs2kafka/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/hdfs2kafka/src/test/resources/log4j.properties b/examples/hdfs2kafka/src/test/resources/log4j.properties
deleted file mode 100644
index 98544e8..0000000
--- a/examples/hdfs2kafka/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,22 +0,0 @@
-log4j.rootLogger=DEBUG,CONSOLE
-
-log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
-log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
-log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
-
-log4j.appender.RFA=org.apache.log4j.RollingFileAppender
-log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
-log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
-log4j.appender.RFA.File=/tmp/app.log
-
-# to enable, add SYSLOG to rootLogger
-log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
-log4j.appender.SYSLOG.syslogHost=127.0.0.1
-log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
-log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
-log4j.appender.SYSLOG.Facility=LOCAL1
-
-#log4j.logger.org.apache.commons.beanutils=warn
-log4j.logger.com.datatorrent=debug
-log4j.logger.org.apache.apex=debug
-log4j.logger.org=info

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/README.md
----------------------------------------------------------------------
diff --git a/examples/kafka/README.md b/examples/kafka/README.md
index 1cecdaa..1a7a9c4 100644
--- a/examples/kafka/README.md
+++ b/examples/kafka/README.md
@@ -1,6 +1,16 @@
+## Kafka to HDFS example :
+
 This sample application show how to read lines from a Kafka topic using the new (0.9)
 Kafka input operator and write them out to HDFS using rolling files with a bounded size.
 
 The output files start out with a `.tmp` extension and get renamed when they reach the
 size bound.  Additional operators to perform parsing, aggregation or filtering can be
 inserted into this pipeline as needed.
+
+## HDFS to Kafka example :
+
+This sample application shows how to read lines from files in HDFS and write
+them out to a Kafka topic. Each line of the input file is considered a separate
+message. The topic name, the name of the directory that is monitored for input
+files, and other parameters are configurable in `META_INF/properties-hdfs2kafka.xml`.
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/XmlJavadocCommentsExtractor.xsl
----------------------------------------------------------------------
diff --git a/examples/kafka/XmlJavadocCommentsExtractor.xsl b/examples/kafka/XmlJavadocCommentsExtractor.xsl
deleted file mode 100644
index 08075a9..0000000
--- a/examples/kafka/XmlJavadocCommentsExtractor.xsl
+++ /dev/null
@@ -1,44 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
-    Licensed under the Apache License, Version 2.0 (the "License");
-    you may not use this file except in compliance with the License.
-    You may obtain a copy of the License at
-
-            http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
-
--->
-
-<!--
-    Document   : XmlJavadocCommentsExtractor.xsl
-    Created on : September 16, 2014, 11:30 AM
-    Description:
-        The transformation strips off all information except for comments and tags from xml javadoc generated by xml-doclet.
--->
-
-<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0">
-  <xsl:output method="xml" standalone="yes"/>
-
-  <!-- copy xml by selecting only the following nodes, attributes and text -->
-  <xsl:template match="node()|text()|@*">
-    <xsl:copy>
-      <xsl:apply-templates select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/>
-    </xsl:copy>
-  </xsl:template>
-
-  <!-- Strip off the following paths from the selected xml -->
-  <xsl:template match="//root/package/interface/interface
-                      |//root/package/interface/method/@qualified
-                      |//root/package/class/interface
-                      |//root/package/class/class
-                      |//root/package/class/method/@qualified
-                      |//root/package/class/field/@qualified" />
-
-  <xsl:strip-space elements="*"/>
-</xsl:stylesheet>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/examples/kafka/pom.xml b/examples/kafka/pom.xml
index ce325bf..c21bb20 100644
--- a/examples/kafka/pom.xml
+++ b/examples/kafka/pom.xml
@@ -2,258 +2,39 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   
-  <groupId>com.example</groupId>
-  <version>1.0-SNAPSHOT</version>
-  <artifactId>kafka2hdfs</artifactId>
-  <packaging>jar</packaging>
-
-  <!-- change these to the appropriate values -->
-  <name>New Kafka Input Operator</name>
-  <description>Example Use of New Kafka Input Operator</description>
-
-  <properties>
-    <!-- change this if you desire to use a different version of Apex Core -->
-    <apex.version>3.5.0</apex.version>
-    <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
-    <malhar.version>3.6.0</malhar.version>
-  </properties>
-
-  <build>
-    <plugins>
-       <plugin>
-         <groupId>org.apache.maven.plugins</groupId>
-         <artifactId>maven-eclipse-plugin</artifactId>
-         <version>2.9</version>
-         <configuration>
-           <downloadSources>true</downloadSources>
-         </configuration>
-       </plugin>
-       <plugin>
-         <artifactId>maven-compiler-plugin</artifactId>
-         <version>3.3</version>
-         <configuration>
-           <encoding>UTF-8</encoding>
-           <source>1.7</source>
-           <target>1.7</target>
-           <debug>true</debug>
-           <optimize>false</optimize>
-           <showDeprecation>true</showDeprecation>
-           <showWarnings>true</showWarnings>
-         </configuration>
-       </plugin>
-       <plugin>
-         <artifactId>maven-dependency-plugin</artifactId>
-         <version>2.8</version>
-         <executions>
-           <execution>
-             <id>copy-dependencies</id>
-             <phase>prepare-package</phase>
-             <goals>
-               <goal>copy-dependencies</goal>
-             </goals>
-             <configuration>
-               <outputDirectory>target/deps</outputDirectory>
-               <includeScope>runtime</includeScope>
-             </configuration>
-           </execution>
-         </executions>
-       </plugin>
-
-       <plugin>
-         <artifactId>maven-assembly-plugin</artifactId>
-         <executions>
-           <execution>
-             <id>app-package-assembly</id>
-             <phase>package</phase>
-             <goals>
-               <goal>single</goal>
-             </goals>
-             <configuration>
-               <finalName>${project.artifactId}-${project.version}-apexapp</finalName>
-               <appendAssemblyId>false</appendAssemblyId>
-               <descriptors>
-                 <descriptor>src/assemble/appPackage.xml</descriptor>
-               </descriptors>
-               <archiverConfig>
-                 <defaultDirectoryMode>0755</defaultDirectoryMode>
-               </archiverConfig>                  
-               <archive>
-                 <manifestEntries>
-                   <Class-Path>${apex.apppackage.classpath}</Class-Path>
-                   <DT-Engine-Version>${apex.version}</DT-Engine-Version>
-                   <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id>
-                   <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name>
-                   <DT-App-Package-Version>${project.version}</DT-App-Package-Version>
-                   <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name>
-                   <DT-App-Package-Description>${project.description}</DT-App-Package-Description>
-                 </manifestEntries>
-               </archive>
-             </configuration>
-           </execution>
-         </executions>
-       </plugin>
-
-       <plugin>
-         <artifactId>maven-antrun-plugin</artifactId>
-         <version>1.7</version>
-         <executions>
-           <execution>
-             <phase>package</phase>
-             <configuration>
-               <target>
-                 <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar"
-                       tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" />
-               </target>
-             </configuration>
-             <goals>
-               <goal>run</goal>
-             </goals>
-           </execution>
-           <execution>
-             <!-- create resource directory for xml javadoc-->
-             <id>createJavadocDirectory</id>
-             <phase>generate-resources</phase>
-             <configuration>
-               <tasks>
-                 <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/>
-                 <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/>
-               </tasks>
-             </configuration>
-             <goals>
-               <goal>run</goal>
-             </goals>
-           </execution>
-         </executions>
-       </plugin>
-
-       <plugin>
-         <groupId>org.codehaus.mojo</groupId>
-         <artifactId>build-helper-maven-plugin</artifactId>
-         <version>1.9.1</version>
-         <executions>
-           <execution>
-             <id>attach-artifacts</id>
-             <phase>package</phase>
-             <goals>
-               <goal>attach-artifact</goal>
-             </goals>
-             <configuration>
-               <artifacts>
-                 <artifact>
-                   <file>target/${project.artifactId}-${project.version}.apa</file>
-                   <type>apa</type>
-                 </artifact>
-               </artifacts>
-               <skipAttach>false</skipAttach>
-             </configuration>
-           </execution>
-         </executions>
-       </plugin>
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar-examples</artifactId>
+    <version>3.8.0-SNAPSHOT</version>
+  </parent>
 
-      <!-- generate javdoc -->
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-javadoc-plugin</artifactId>
-        <executions>
-          <!-- generate xml javadoc -->
-          <execution>
-            <id>xml-doclet</id>
-            <phase>generate-resources</phase>
-            <goals>
-              <goal>javadoc</goal>
-            </goals>
-            <configuration>
-              <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet>
-              <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam>
-              <useStandardDocletOptions>false</useStandardDocletOptions>
-              <docletArtifact>
-                <groupId>com.github.markusbernhardt</groupId>
-                <artifactId>xml-doclet</artifactId>
-                <version>1.0.4</version>
-              </docletArtifact>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-      <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags-->
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>xml-maven-plugin</artifactId>
-        <version>1.0</version>
-        <executions>
-          <execution>
-            <id>transform-xmljavadoc</id>
-            <phase>generate-resources</phase>
-            <goals>
-              <goal>transform</goal>
-            </goals>
-          </execution>
-        </executions>
-        <configuration>
-          <transformationSets>
-            <transformationSet>
-              <dir>${project.build.directory}/generated-resources/xml-javadoc</dir>
-              <includes>
-                <include>${project.artifactId}-${project.version}-javadoc.xml</include>
-              </includes>
-              <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet>
-              <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir>
-            </transformationSet>
-          </transformationSets>
-        </configuration>
-      </plugin>
-      <!-- copy xml javadoc to class jar -->
-      <plugin>
-        <artifactId>maven-resources-plugin</artifactId>
-        <version>2.6</version>
-        <executions>
-          <execution>
-            <id>copy-resources</id>
-            <phase>process-resources</phase>
-            <goals>
-              <goal>copy-resources</goal>
-            </goals>
-            <configuration>
-              <outputDirectory>${basedir}/target/classes</outputDirectory>
-              <resources>
-                <resource>
-                  <directory>${project.build.directory}/generated-resources/xml-javadoc</directory>
-                  <includes>
-                    <include>${project.artifactId}-${project.version}-javadoc.xml</include>
-                  </includes>
-                  <filtering>true</filtering>
-                </resource>
-              </resources>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-
-    </plugins>
-
-  </build>
+  <artifactId>malhar-examples-kafka</artifactId>
+  <packaging>jar</packaging>
 
+  <name>Apache Apex Malhar Kafka examples</name>
+  <description>
+    kafka2hdfs is a example show how to read lines from a Kafka topic using the new (0.9)
+    Kafka input operator and write them out to HDFS. 
+    hdfs2kafka is a simple application to transfer data from HDFS to Kafka
+  </description>
+  
   <dependencies>
     <!-- add your dependencies here -->
     <dependency>
       <groupId>org.apache.apex</groupId>
       <artifactId>malhar-kafka</artifactId>
-      <version>${malhar.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.kafka</groupId>
-          <artifactId>kafka-clients</artifactId>
-        </exclusion>
-      </exclusions>
+      <version>${project.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.apex</groupId>
-      <artifactId>malhar-library</artifactId>
-      <version>${malhar.version}</version>
-      <!--
-           If you know that your application does not need transitive dependencies pulled in by malhar-library,
-           uncomment the following to reduce the size of your app package.
-      -->
+      <artifactId>apex-engine</artifactId>
+      <version>${apex.core.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>info.batey.kafka</groupId>
+      <artifactId>kafka-unit</artifactId>
+      <version>0.4</version>  
       <exclusions>
         <exclusion>
           <groupId>*</groupId>
@@ -261,37 +42,10 @@
         </exclusion>
       </exclusions>
     </dependency>
-
     <dependency>
       <groupId>org.apache.apex</groupId>
-      <artifactId>apex-common</artifactId>
-      <version>${apex.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <version>4.10</version>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.apex</groupId>
-      <artifactId>apex-engine</artifactId>
-      <version>${apex.version}</version>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_2.11</artifactId>
-      <version>0.9.0.1</version>
-    </dependency>
-
-    <dependency>
-      <groupId>info.batey.kafka</groupId>
-      <artifactId>kafka-unit</artifactId>
-      <version>0.4</version>
+      <artifactId>malhar-contrib</artifactId>
+      <version>${project.version}</version>
       <exclusions>
         <exclusion>
           <groupId>*</groupId>
@@ -299,9 +53,12 @@
         </exclusion>
       </exclusions>
     </dependency>
-
-
-
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.11</artifactId>
+      <version>0.9.0.0</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/src/main/java/com/example/myapexapp/KafkaApp.java
----------------------------------------------------------------------
diff --git a/examples/kafka/src/main/java/com/example/myapexapp/KafkaApp.java b/examples/kafka/src/main/java/com/example/myapexapp/KafkaApp.java
deleted file mode 100644
index 09089eb..0000000
--- a/examples/kafka/src/main/java/com/example/myapexapp/KafkaApp.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package com.example.myapexapp;
-
-import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
-import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-
-@ApplicationAnnotation(name="Kafka2HDFS")
-public class KafkaApp implements StreamingApplication
-{
-
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    KafkaSinglePortInputOperator in
-      = dag.addOperator("kafkaIn", new KafkaSinglePortInputOperator());
-
-    in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
-    LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator());
-
-    dag.addStream("data", in.outputPort, out.input);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/src/main/java/com/example/myapexapp/LineOutputOperator.java
----------------------------------------------------------------------
diff --git a/examples/kafka/src/main/java/com/example/myapexapp/LineOutputOperator.java b/examples/kafka/src/main/java/com/example/myapexapp/LineOutputOperator.java
deleted file mode 100644
index 2b184c6..0000000
--- a/examples/kafka/src/main/java/com/example/myapexapp/LineOutputOperator.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package com.example.myapexapp;
-
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-
-import javax.validation.constraints.NotNull;
-
-import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
-
-/**
- * Converts each tuple to a string and writes it as a new line to the output file
- */
-public class LineOutputOperator extends AbstractFileOutputOperator<byte[]>
-{
-  private static final String NL = System.lineSeparator();
-  private static final Charset CS = StandardCharsets.UTF_8;
-
-  @NotNull
-  private String baseName;
-
-  @Override
-  public byte[] getBytesForTuple(byte[] t) {
-    String result = new String(t, CS) + NL;
-    return result.getBytes(CS);
- }
-
-  @Override
-  protected String getFileName(byte[] tuple) {
-    return baseName;
-  }
-
-  public String getBaseName() { return baseName; }
-  public void setBaseName(String v) { baseName = v; }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/src/main/java/org/apache/apex/examples/kafka/hdfs2kafka/Application.java
----------------------------------------------------------------------
diff --git a/examples/kafka/src/main/java/org/apache/apex/examples/kafka/hdfs2kafka/Application.java b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/hdfs2kafka/Application.java
new file mode 100644
index 0000000..646c8e8
--- /dev/null
+++ b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/hdfs2kafka/Application.java
@@ -0,0 +1,26 @@
+package org.apache.apex.examples.kafka.hdfs2kafka;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.contrib.kafka.KafkaSinglePortOutputOperator;
+import org.apache.apex.malhar.lib.fs.LineByLineFileInputOperator;
+
+@ApplicationAnnotation(name="Hdfs2Kafka")
+public class Application implements StreamingApplication
+{
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    LineByLineFileInputOperator in = dag.addOperator("lines",
+                                                     LineByLineFileInputOperator.class);
+
+    KafkaSinglePortOutputOperator<String,String> out = dag.addOperator("kafkaOutput", new KafkaSinglePortOutputOperator<String,String>());
+
+    dag.addStream("data", in.output, out.inputPort).setLocality(Locality.CONTAINER_LOCAL);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/KafkaApp.java
----------------------------------------------------------------------
diff --git a/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/KafkaApp.java b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/KafkaApp.java
new file mode 100644
index 0000000..15f0182
--- /dev/null
+++ b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/KafkaApp.java
@@ -0,0 +1,26 @@
+package org.apache.apex.examples.kafka.kafka2hdfs;
+
+import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
+import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+@ApplicationAnnotation(name="Kafka2HDFS")
+public class KafkaApp implements StreamingApplication
+{
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    KafkaSinglePortInputOperator in
+      = dag.addOperator("kafkaIn", new KafkaSinglePortInputOperator());
+
+    in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
+    LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator());
+
+    dag.addStream("data", in.outputPort, out.input);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/LineOutputOperator.java
----------------------------------------------------------------------
diff --git a/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/LineOutputOperator.java b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/LineOutputOperator.java
new file mode 100644
index 0000000..ef40a69
--- /dev/null
+++ b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/LineOutputOperator.java
@@ -0,0 +1,34 @@
+package org.apache.apex.examples.kafka.kafka2hdfs;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
+
+/**
+ * Converts each tuple to a string and writes it as a new line to the output file
+ */
+public class LineOutputOperator extends AbstractFileOutputOperator<byte[]>
+{
+  private static final String NL = System.lineSeparator();
+  private static final Charset CS = StandardCharsets.UTF_8;
+
+  @NotNull
+  private String baseName;
+
+  @Override
+  public byte[] getBytesForTuple(byte[] t) {
+    String result = new String(t, CS) + NL;
+    return result.getBytes(CS);
+ }
+
+  @Override
+  protected String getFileName(byte[] tuple) {
+    return baseName;
+  }
+
+  public String getBaseName() { return baseName; }
+  public void setBaseName(String v) { baseName = v; }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/src/main/resources/META-INF/properties-hdfs2kafka.xml
----------------------------------------------------------------------
diff --git a/examples/kafka/src/main/resources/META-INF/properties-hdfs2kafka.xml b/examples/kafka/src/main/resources/META-INF/properties-hdfs2kafka.xml
new file mode 100644
index 0000000..7c624ca
--- /dev/null
+++ b/examples/kafka/src/main/resources/META-INF/properties-hdfs2kafka.xml
@@ -0,0 +1,16 @@
+<?xml version="1.0"?>
+<configuration>
+  <property>
+    <name>dt.operator.kafkaOutput.prop.topic</name>
+    <value>hdfs2kafka</value>
+  </property>
+  <property>
+    <name>dt.operator.lines.prop.directory</name>
+    <value>/tmp/hdfs2kafka</value>
+  </property>
+  <property>
+    <name>dt.operator.kafkaOutput.prop.producerProperties</name>
+    <value>serializer.class=kafka.serializer.StringEncoder,producer.type=async,metadata.broker.list=localhost:9092</value>
+  </property>
+</configuration>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/src/test/java/com/example/myapexapp/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/kafka/src/test/java/com/example/myapexapp/ApplicationTest.java b/examples/kafka/src/test/java/com/example/myapexapp/ApplicationTest.java
deleted file mode 100644
index 635d25a..0000000
--- a/examples/kafka/src/test/java/com/example/myapexapp/ApplicationTest.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Put your copyright and license info here.
- */
-package com.example.myapexapp;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-
-import java.util.ArrayList;
-
-import javax.validation.ConstraintViolationException;
-
-import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
-import org.apache.hadoop.conf.Configuration;
-
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import info.batey.kafka.unit.KafkaUnitRule;
-import info.batey.kafka.unit.KafkaUnit;
-
-import kafka.producer.KeyedMessage;
-
-import com.datatorrent.api.LocalMode;
-
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test the DAG declaration in local mode.
- */
-public class ApplicationTest {
-  private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
-  private static final String TOPIC = "kafka2hdfs";
-
-  private static final int zkPort = 2181;
-  private static final int  brokerPort = 9092;
-  private static final String BROKER = "localhost:" + brokerPort;
-  private static final String FILE_NAME = "test";
-  private static final String FILE_DIR  = "/tmp/FromKafka";
-  private static final String FILE_PATH = FILE_DIR + "/" + FILE_NAME + ".0";     // first part
-
-  // test messages
-  private static String[] lines =
-  {
-    "1st line",
-    "2nd line",
-    "3rd line",
-    "4th line",
-    "5th line",
-  };
-
-  // broker port must match properties.xml
-  @Rule
-  public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort);
-
-  @Test
-  public void testApplication() throws Exception {
-    try {
-      // delete output file if it exists
-      File file = new File(FILE_PATH);
-      file.delete();
-
-      // write messages to Kafka topic
-      writeToTopic();
-
-      // run app asynchronously; terminate after results are checked
-      LocalMode.Controller lc = asyncRun();
-
-      // check for presence of output file
-      chkOutput();
-
-      // compare output lines to input
-      compare();
-
-      lc.shutdown();
-    } catch (ConstraintViolationException e) {
-      Assert.fail("constraint violations: " + e.getConstraintViolations());
-    }
-  }
-
-  private void writeToTopic() {
-    KafkaUnit ku = kafkaUnitRule.getKafkaUnit();
-    ku.createTopic(TOPIC);
-    for (String line : lines) {
-      KeyedMessage<String, String> kMsg = new KeyedMessage<>(TOPIC, line);
-      ku.sendMessages(kMsg);
-    }
-    LOG.debug("Sent messages to topic {}", TOPIC);
-  }
-
-  private Configuration getConfig() {
-      Configuration conf = new Configuration(false);
-      String pre = "dt.operator.kafkaIn.prop.";
-      conf.setEnum(pre + "initialOffset",
-                   AbstractKafkaInputOperator.InitialOffset.EARLIEST);
-      conf.setInt(pre + "initialPartitionCount", 1);
-      conf.set(   pre + "topics",                TOPIC);
-      conf.set(   pre + "clusters",              BROKER);
-
-      pre = "dt.operator.fileOut.prop.";
-      conf.set(   pre + "filePath",        FILE_DIR);
-      conf.set(   pre + "baseName",        FILE_NAME);
-      conf.setInt(pre + "maxLength",       40);
-      conf.setInt(pre + "rotationWindows", 3);
-
-      return conf;
-  }
-
-  private LocalMode.Controller asyncRun() throws Exception {
-    Configuration conf = getConfig();
-    LocalMode lma = LocalMode.newInstance();
-    lma.prepareDAG(new KafkaApp(), conf);
-    LocalMode.Controller lc = lma.getController();
-    lc.runAsync();
-    return lc;
-  }
-
-  private static void chkOutput() throws Exception {
-    File file = new File(FILE_PATH);
-    final int MAX = 60;
-    for (int i = 0; i < MAX && (! file.exists()); ++i ) {
-      LOG.debug("Sleeping, i = {}", i);
-      Thread.sleep(1000);
-    }
-    if (! file.exists()) {
-      String msg = String.format("Error: %s not found after %d seconds%n", FILE_PATH, MAX);
-      throw new RuntimeException(msg);
-    }
-  }
-
-  private static void compare() throws Exception {
-    // read output file
-    File file = new File(FILE_PATH);
-    BufferedReader br = new BufferedReader(new FileReader(file));
-    ArrayList<String> list = new ArrayList<>();
-    String line;
-    while (null != (line = br.readLine())) {
-      list.add(line);
-    }
-    br.close();
-
-    // compare
-    Assert.assertEquals("number of lines", list.size(), lines.length);
-    for (int i = 0; i < lines.length; ++i) {
-      assertTrue("line", lines[i].equals(list.get(i)));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/src/test/java/org/apache/apex/examples/kafka/hdfs2kafka/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/kafka/src/test/java/org/apache/apex/examples/kafka/hdfs2kafka/ApplicationTest.java b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/hdfs2kafka/ApplicationTest.java
new file mode 100644
index 0000000..aa63ee5
--- /dev/null
+++ b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/hdfs2kafka/ApplicationTest.java
@@ -0,0 +1,125 @@
+package org.apache.apex.examples.kafka.hdfs2kafka;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+import info.batey.kafka.unit.KafkaUnit;
+import info.batey.kafka.unit.KafkaUnitRule;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test the DAG declaration in local mode.
+ */
+public class ApplicationTest {
+  private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
+  private static final String TOPIC = "hdfs2kafka";
+  private static final String directory = "target/hdfs2kafka";
+  private static final String FILE_NAME = "messages.txt";
+
+  private static final int zkPort = 2181;
+  private static final int  brokerPort = 9092;
+  private static final String BROKER = "localhost:" + brokerPort;
+  //private static final String FILE_PATH = FILE_DIR + "/" + FILE_NAME + ".0";     // first part
+
+  // test messages
+  private static String[] lines =
+  {
+    "1st line",
+    "2nd line",
+    "3rd line",
+    "4th line",
+    "5th line",
+  };
+
+  // broker port must match properties.xml
+  @Rule
+  public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort);
+
+
+  @Test
+  public void testApplication() throws IOException, Exception {
+    try {
+      // create file in monitored HDFS directory
+      createFile();
+
+      // run app asynchronously; terminate after results are checked
+      LocalMode.Controller lc = asyncRun();
+
+      // get messages from Kafka topic and compare with input
+      chkOutput();
+
+      lc.shutdown();
+    } catch (ConstraintViolationException e) {
+      Assert.fail("constraint violations: " + e.getConstraintViolations());
+    }
+  }
+
+  // create a file with content from 'lines'
+  private void createFile() throws IOException {
+    // remove old file and create new one
+    File file = new File(directory, FILE_NAME);
+    FileUtils.deleteQuietly(file);
+    try {
+      String data = StringUtils.join(lines, "\n") + "\n";    // add final newline
+      FileUtils.writeStringToFile(file, data, "UTF-8");
+    } catch (IOException e) {
+      LOG.error("Error: Failed to create file {} in {}", FILE_NAME, directory);
+      e.printStackTrace();
+    }
+    LOG.debug("Created file {} with {} lines in {}",
+              FILE_NAME, lines.length, directory);
+  }
+
+  private LocalMode.Controller asyncRun() throws Exception {
+    Configuration conf = getConfig();
+    LocalMode lma = LocalMode.newInstance();
+    lma.prepareDAG(new Application(), conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.runAsync();
+    return lc;
+  }
+
+  private Configuration getConfig() {
+      Configuration conf = new Configuration(false);
+      conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-hdfs2kafka.xml"));
+      conf.set("dt.operator.lines.prop.directory", directory);
+      return conf;
+  }
+
+  private void chkOutput() throws Exception {
+    KafkaUnit ku = kafkaUnitRule.getKafkaUnit();
+    List<String> messages = null;
+
+    // wait for messages to appear in kafka
+    Thread.sleep(10000);
+
+    try {
+      messages = ku.readMessages(TOPIC, lines.length);
+    } catch (Exception e) {
+      LOG.error("Error: Got exception {}", e);
+    }
+
+    int i = 0;
+    for (String msg : messages) {
+      assertTrue("Error: message mismatch", msg.equals(lines[i]));
+      ++i;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/src/test/java/org/apache/apex/examples/kafka/kafka2hdfs/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/kafka/src/test/java/org/apache/apex/examples/kafka/kafka2hdfs/ApplicationTest.java b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/kafka2hdfs/ApplicationTest.java
new file mode 100644
index 0000000..80d84fa
--- /dev/null
+++ b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/kafka2hdfs/ApplicationTest.java
@@ -0,0 +1,150 @@
+/**
+ * Put your copyright and license info here.
+ */
+package org.apache.apex.examples.kafka.kafka2hdfs;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.util.ArrayList;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+import info.batey.kafka.unit.KafkaUnit;
+import info.batey.kafka.unit.KafkaUnitRule;
+import kafka.producer.KeyedMessage;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test the DAG declaration in local mode.
+ */
+public class ApplicationTest {
+  private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
+  private static final String TOPIC = "kafka2hdfs";
+
+  private static final int zkPort = 2181;
+  private static final int  brokerPort = 9092;
+  private static final String BROKER = "localhost:" + brokerPort;
+  private static final String FILE_NAME = "test";
+  private static final String FILE_DIR  = "/tmp/FromKafka";
+  private static final String FILE_PATH = FILE_DIR + "/" + FILE_NAME + ".0";     // first part
+
+  // test messages
+  private static String[] lines =
+  {
+    "1st line",
+    "2nd line",
+    "3rd line",
+    "4th line",
+    "5th line",
+  };
+
+  // broker port must match properties.xml
+  @Rule
+  public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort);
+
+  @Test
+  public void testApplication() throws Exception {
+    try {
+      // delete output file if it exists
+      File file = new File(FILE_PATH);
+      file.delete();
+
+      // write messages to Kafka topic
+      writeToTopic();
+
+      // run app asynchronously; terminate after results are checked
+      LocalMode.Controller lc = asyncRun();
+
+      // check for presence of output file
+      chkOutput();
+
+      // compare output lines to input
+      compare();
+
+      lc.shutdown();
+    } catch (ConstraintViolationException e) {
+      Assert.fail("constraint violations: " + e.getConstraintViolations());
+    }
+  }
+
+  private void writeToTopic() {
+    KafkaUnit ku = kafkaUnitRule.getKafkaUnit();
+    ku.createTopic(TOPIC);
+    for (String line : lines) {
+      KeyedMessage<String, String> kMsg = new KeyedMessage<>(TOPIC, line);
+      ku.sendMessages(kMsg);
+    }
+    LOG.debug("Sent messages to topic {}", TOPIC);
+  }
+
+  private Configuration getConfig() {
+      Configuration conf = new Configuration(false);
+      String pre = "dt.operator.kafkaIn.prop.";
+      conf.setEnum(pre + "initialOffset",
+                   AbstractKafkaInputOperator.InitialOffset.EARLIEST);
+      conf.setInt(pre + "initialPartitionCount", 1);
+      conf.set(   pre + "topics",                TOPIC);
+      conf.set(   pre + "clusters",              BROKER);
+
+      pre = "dt.operator.fileOut.prop.";
+      conf.set(   pre + "filePath",        FILE_DIR);
+      conf.set(   pre + "baseName",        FILE_NAME);
+      conf.setInt(pre + "maxLength",       40);
+      conf.setInt(pre + "rotationWindows", 3);
+
+      return conf;
+  }
+
+  private LocalMode.Controller asyncRun() throws Exception {
+    Configuration conf = getConfig();
+    LocalMode lma = LocalMode.newInstance();
+    lma.prepareDAG(new KafkaApp(), conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.runAsync();
+    return lc;
+  }
+
+  private static void chkOutput() throws Exception {
+    File file = new File(FILE_PATH);
+    final int MAX = 60;
+    for (int i = 0; i < MAX && (! file.exists()); ++i ) {
+      LOG.debug("Sleeping, i = {}", i);
+      Thread.sleep(1000);
+    }
+    if (! file.exists()) {
+      String msg = String.format("Error: %s not found after %d seconds%n", FILE_PATH, MAX);
+      throw new RuntimeException(msg);
+    }
+  }
+
+  private static void compare() throws Exception {
+    // read output file
+    File file = new File(FILE_PATH);
+    BufferedReader br = new BufferedReader(new FileReader(file));
+    ArrayList<String> list = new ArrayList<>();
+    String line;
+    while (null != (line = br.readLine())) {
+      list.add(line);
+    }
+    br.close();
+
+    // compare
+    Assert.assertEquals("number of lines", list.size(), lines.length);
+    for (int i = 0; i < lines.length; ++i) {
+      assertTrue("line", lines[i].equals(list.get(i)));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 8412c8f..16cfe26 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -197,6 +197,7 @@
     <module>recordReader</module>
     <module>throttle</module>
     <module>transform</module>
+    <module>kafka</module>
   </modules>
 
   <dependencies>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9bccc18..5b531ea 100644
--- a/pom.xml
+++ b/pom.xml
@@ -196,7 +196,6 @@
     <profile>
       <id>all-modules</id>
       <modules>
-        <module>kafka</module>
         <module>hive</module>
         <module>stream</module>
         <module>benchmark</module>
@@ -210,6 +209,7 @@
   <modules>
     <module>library</module>
     <module>contrib</module>
+    <module>kafka</module>
     <module>examples</module>
   </modules>