You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2017/04/24 16:16:05 UTC

[3/3] apex-malhar git commit: SPOI-7941 Add Kafka 0.9 example -- kafka to hdfs SPOI-8632 Example to read from HDFS and write to Kafka -- hdfs to kafka

SPOI-7941 Add Kafka 0.9 example -- kafka to hdfs
SPOI-8632 Example to read from HDFS and write to Kafka -- hdfs to kafka


Branch: refs/heads/master
Commit: 24027edf5b45f54815722ae14fcef9c5e13ff3f8
Parents: a4551b4
Author: Munagala V. Ramanath <>
Authored: Wed Mar 29 10:33:00 2017 -0700
Committer: Lakshmi Prasanna Velineni <>
Committed: Mon Apr 24 09:06:40 2017 -0700

 examples/hdfs2kafka/                   |   4 +
 .../hdfs2kafka/XmlJavadocCommentsExtractor.xsl  |  44 +++
 examples/hdfs2kafka/pom.xml                     | 315 +++++++++++++++++++
 examples/hdfs2kafka/src/assemble/appPackage.xml |  43 +++
 .../java/com/example/myapexapp/ |  26 ++
 .../src/main/resources/META-INF/properties.xml  |  16 +
 .../com/example/myapexapp/  | 132 ++++++++
 .../src/test/resources/         |  22 ++
 examples/kafka/                        |   6 +
 examples/kafka/XmlJavadocCommentsExtractor.xsl  |  44 +++
 examples/kafka/pom.xml                          | 307 ++++++++++++++++++
 examples/kafka/src/assemble/appPackage.xml      |  43 +++
 .../java/com/example/myapexapp/    |  26 ++
 .../example/myapexapp/   |  34 ++
 .../src/main/resources/META-INF/properties.xml  |  48 +++
 .../com/example/myapexapp/  | 152 +++++++++
 .../kafka/src/test/resources/   |  21 ++
 17 files changed, 1283 insertions(+)
diff --git a/examples/hdfs2kafka/ b/examples/hdfs2kafka/
new file mode 100644
index 0000000..166abd3
--- /dev/null
+++ b/examples/hdfs2kafka/
@@ -0,0 +1,4 @@
+This sample application shows how to read lines from files in HDFS and write
+them out to a Kafka topic. Each line of the input file is considered a separate
+message. The topic name, the name of the directory that is monitored for input
+files, and other parameters are configurable in `META_INF/properties.xml`.
diff --git a/examples/hdfs2kafka/XmlJavadocCommentsExtractor.xsl b/examples/hdfs2kafka/XmlJavadocCommentsExtractor.xsl
new file mode 100644
index 0000000..08075a9
--- /dev/null
+++ b/examples/hdfs2kafka/XmlJavadocCommentsExtractor.xsl
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+    Document   : XmlJavadocCommentsExtractor.xsl
+    Created on : September 16, 2014, 11:30 AM
+    Description:
+        The transformation strips off all information except for comments and tags from xml javadoc generated by xml-doclet.
+<xsl:stylesheet xmlns:xsl="" version="1.0">
+  <xsl:output method="xml" standalone="yes"/>
+  <!-- copy xml by selecting only the following nodes, attributes and text -->
+  <xsl:template match="node()|text()|@*">
+    <xsl:copy>
+      <xsl:apply-templates select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/>
+    </xsl:copy>
+  </xsl:template>
+  <!-- Strip off the following paths from the selected xml -->
+  <xsl:template match="//root/package/interface/interface
+                      |//root/package/interface/method/@qualified
+                      |//root/package/class/interface
+                      |//root/package/class/class
+                      |//root/package/class/method/@qualified
+                      |//root/package/class/field/@qualified" />
+  <xsl:strip-space elements="*"/>
diff --git a/examples/hdfs2kafka/pom.xml b/examples/hdfs2kafka/pom.xml
new file mode 100644
index 0000000..75cfb6d
--- /dev/null
+++ b/examples/hdfs2kafka/pom.xml
@@ -0,0 +1,315 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="" xmlns:xsi="" xsi:schemaLocation="">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>com.example</groupId>
+  <version>1.0-SNAPSHOT</version>
+  <artifactId>hdfsToKafka</artifactId>
+  <packaging>jar</packaging>
+  <!-- change these to the appropriate values -->
+  <name>HDFS to Kafka</name>
+  <description>Simple application to transfer data from HDFS to Kafka</description>
+  <properties>
+    <!-- change this if you desire to use a different version of Apex Core -->
+    <apex.version>3.5.0</apex.version>
+    <malhar.version>3.6.0</malhar.version>
+    <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
+  </properties>
+  <build>
+    <plugins>
+       <plugin>
+         <groupId>org.apache.maven.plugins</groupId>
+         <artifactId>maven-eclipse-plugin</artifactId>
+         <version>2.9</version>
+         <configuration>
+           <downloadSources>true</downloadSources>
+         </configuration>
+       </plugin>
+       <plugin>
+         <artifactId>maven-compiler-plugin</artifactId>
+         <version>3.3</version>
+         <configuration>
+           <encoding>UTF-8</encoding>
+           <source>1.7</source>
+           <target>1.7</target>
+           <debug>true</debug>
+           <optimize>false</optimize>
+           <showDeprecation>true</showDeprecation>
+           <showWarnings>true</showWarnings>
+         </configuration>
+       </plugin>
+       <plugin>
+         <artifactId>maven-dependency-plugin</artifactId>
+         <version>2.8</version>
+         <executions>
+           <execution>
+             <id>copy-dependencies</id>
+             <phase>prepare-package</phase>
+             <goals>
+               <goal>copy-dependencies</goal>
+             </goals>
+             <configuration>
+               <outputDirectory>target/deps</outputDirectory>
+               <includeScope>runtime</includeScope>
+             </configuration>
+           </execution>
+         </executions>
+       </plugin>
+       <plugin>
+         <artifactId>maven-assembly-plugin</artifactId>
+         <executions>
+           <execution>
+             <id>app-package-assembly</id>
+             <phase>package</phase>
+             <goals>
+               <goal>single</goal>
+             </goals>
+             <configuration>
+               <finalName>${project.artifactId}-${project.version}-apexapp</finalName>
+               <appendAssemblyId>false</appendAssemblyId>
+               <descriptors>
+                 <descriptor>src/assemble/appPackage.xml</descriptor>
+               </descriptors>
+               <archiverConfig>
+                 <defaultDirectoryMode>0755</defaultDirectoryMode>
+               </archiverConfig>                  
+               <archive>
+                 <manifestEntries>
+                   <Class-Path>${apex.apppackage.classpath}</Class-Path>
+                   <DT-Engine-Version>${apex.version}</DT-Engine-Version>
+                   <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id>
+                   <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name>
+                   <DT-App-Package-Version>${project.version}</DT-App-Package-Version>
+                   <DT-App-Package-Display-Name>${}</DT-App-Package-Display-Name>
+                   <DT-App-Package-Description>${project.description}</DT-App-Package-Description>
+                 </manifestEntries>
+               </archive>
+             </configuration>
+           </execution>
+         </executions>
+       </plugin>
+       <plugin>
+         <artifactId>maven-antrun-plugin</artifactId>
+         <version>1.7</version>
+         <executions>
+           <execution>
+             <phase>package</phase>
+             <configuration>
+               <target>
+                 <move file="${}/${project.artifactId}-${project.version}-apexapp.jar"
+                       tofile="${}/${project.artifactId}-${project.version}.apa" />
+               </target>
+             </configuration>
+             <goals>
+               <goal>run</goal>
+             </goals>
+           </execution>
+           <execution>
+             <!-- create resource directory for xml javadoc-->
+             <id>createJavadocDirectory</id>
+             <phase>generate-resources</phase>
+             <configuration>
+               <tasks>
+                 <delete dir="${}/generated-resources/xml-javadoc"/>
+                 <mkdir dir="${}/generated-resources/xml-javadoc"/>
+               </tasks>
+             </configuration>
+             <goals>
+               <goal>run</goal>
+             </goals>
+           </execution>
+         </executions>
+       </plugin>
+       <plugin>
+         <groupId>org.codehaus.mojo</groupId>
+         <artifactId>build-helper-maven-plugin</artifactId>
+         <version>1.9.1</version>
+         <executions>
+           <execution>
+             <id>attach-artifacts</id>
+             <phase>package</phase>
+             <goals>
+               <goal>attach-artifact</goal>
+             </goals>
+             <configuration>
+               <artifacts>
+                 <artifact>
+                   <file>target/${project.artifactId}-${project.version}.apa</file>
+                   <type>apa</type>
+                 </artifact>
+               </artifacts>
+               <skipAttach>false</skipAttach>
+             </configuration>
+           </execution>
+         </executions>
+       </plugin>
+      <!-- generate javdoc -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-javadoc-plugin</artifactId>
+        <executions>
+          <!-- generate xml javadoc -->
+          <execution>
+            <id>xml-doclet</id>
+            <phase>generate-resources</phase>
+            <goals>
+              <goal>javadoc</goal>
+            </goals>
+            <configuration>
+              <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet>
+              <additionalparam>-d ${}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam>
+              <useStandardDocletOptions>false</useStandardDocletOptions>
+              <docletArtifact>
+                <groupId>com.github.markusbernhardt</groupId>
+                <artifactId>xml-doclet</artifactId>
+                <version>1.0.4</version>
+              </docletArtifact>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags-->
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>xml-maven-plugin</artifactId>
+        <version>1.0</version>
+        <executions>
+          <execution>
+            <id>transform-xmljavadoc</id>
+            <phase>generate-resources</phase>
+            <goals>
+              <goal>transform</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <transformationSets>
+            <transformationSet>
+              <dir>${}/generated-resources/xml-javadoc</dir>
+              <includes>
+                <include>${project.artifactId}-${project.version}-javadoc.xml</include>
+              </includes>
+              <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet>
+              <outputDir>${}/generated-resources/xml-javadoc</outputDir>
+            </transformationSet>
+          </transformationSets>
+        </configuration>
+      </plugin>
+      <!-- copy xml javadoc to class jar -->
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+        <version>2.6</version>
+        <executions>
+          <execution>
+            <id>copy-resources</id>
+            <phase>process-resources</phase>
+            <goals>
+              <goal>copy-resources</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${basedir}/target/classes</outputDirectory>
+              <resources>
+                <resource>
+                  <directory>${}/generated-resources/xml-javadoc</directory>
+                  <includes>
+                    <include>${project.artifactId}-${project.version}-javadoc.xml</include>
+                  </includes>
+                  <filtering>true</filtering>
+                </resource>
+              </resources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+    <!-- add your dependencies here -->
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-library</artifactId>
+      <version>3.4.0</version>
+      <!-- 
+           If you know that your application does not need transitive dependencies pulled in by malhar-library,
+           uncomment the following to reduce the size of your app package.
+      -->
+      <!--    
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+      -->
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-contrib</artifactId>
+      <version>3.4.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>apex-common</artifactId>
+      <version>${apex.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.10</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>apex-engine</artifactId>
+      <version>${apex.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.8.2</artifactId>
+      <version>0.8.1</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-simple</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>zookeeper</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>info.batey.kafka</groupId>
+      <artifactId>kafka-unit</artifactId>
+      <version>0.3</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
diff --git a/examples/hdfs2kafka/src/assemble/appPackage.xml b/examples/hdfs2kafka/src/assemble/appPackage.xml
new file mode 100644
index 0000000..7ad071c
--- /dev/null
+++ b/examples/hdfs2kafka/src/assemble/appPackage.xml
@@ -0,0 +1,43 @@
+<assembly xmlns=""
+    xmlns:xsi=""
+    xsi:schemaLocation="">
+  <id>appPackage</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/</directory>
+      <outputDirectory>/app</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/target/deps</directory>
+      <outputDirectory>/lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/site/conf</directory>
+      <outputDirectory>/conf</outputDirectory>
+      <includes>
+        <include>*.xml</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/META-INF</directory>
+      <outputDirectory>/META-INF</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/app</directory>
+      <outputDirectory>/app</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/resources</directory>
+      <outputDirectory>/resources</outputDirectory>
+    </fileSet>
+  </fileSets>
diff --git a/examples/hdfs2kafka/src/main/java/com/example/myapexapp/ b/examples/hdfs2kafka/src/main/java/com/example/myapexapp/
new file mode 100644
index 0000000..447ae1c
--- /dev/null
+++ b/examples/hdfs2kafka/src/main/java/com/example/myapexapp/
@@ -0,0 +1,26 @@
+package com.example.myapexapp;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.contrib.kafka.KafkaSinglePortOutputOperator;
+import org.apache.apex.malhar.lib.fs.LineByLineFileInputOperator;
+public class Application implements StreamingApplication
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    LineByLineFileInputOperator in = dag.addOperator("lines",
+                                                     LineByLineFileInputOperator.class);
+    KafkaSinglePortOutputOperator<String,String> out = dag.addOperator("kafkaOutput", new KafkaSinglePortOutputOperator<String,String>());
+    dag.addStream("data", in.output, out.inputPort).setLocality(Locality.CONTAINER_LOCAL);
+  }
diff --git a/examples/hdfs2kafka/src/main/resources/META-INF/properties.xml b/examples/hdfs2kafka/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..7c624ca
--- /dev/null
+++ b/examples/hdfs2kafka/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,16 @@
+<?xml version="1.0"?>
+  <property>
+    <name>dt.operator.kafkaOutput.prop.topic</name>
+    <value>hdfs2kafka</value>
+  </property>
+  <property>
+    <name></name>
+    <value>/tmp/hdfs2kafka</value>
+  </property>
+  <property>
+    <name>dt.operator.kafkaOutput.prop.producerProperties</name>
+    <value>serializer.class=kafka.serializer.StringEncoder,producer.type=async,</value>
+  </property>
diff --git a/examples/hdfs2kafka/src/test/java/com/example/myapexapp/ b/examples/hdfs2kafka/src/test/java/com/example/myapexapp/
new file mode 100644
index 0000000..2c415be
--- /dev/null
+++ b/examples/hdfs2kafka/src/test/java/com/example/myapexapp/
@@ -0,0 +1,132 @@
+package com.example.myapexapp;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import javax.validation.ConstraintViolationException;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+import info.batey.kafka.unit.KafkaUnitRule;
+import info.batey.kafka.unit.KafkaUnit;
+import kafka.producer.KeyedMessage;
+import com.datatorrent.api.LocalMode;
+import com.example.myapexapp.Application;
+import static org.junit.Assert.assertTrue;
+ * Test the DAG declaration in local mode.
+ */
+public class ApplicationTest {
+  private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
+  private static final String TOPIC = "hdfs2kafka";
+  private static final String directory = "target/hdfs2kafka";
+  private static final String FILE_NAME = "messages.txt";
+  private static final int zkPort = 2181;
+  private static final int  brokerPort = 9092;
+  private static final String BROKER = "localhost:" + brokerPort;
+  //private static final String FILE_PATH = FILE_DIR + "/" + FILE_NAME + ".0";     // first part
+  // test messages
+  private static String[] lines =
+  {
+    "1st line",
+    "2nd line",
+    "3rd line",
+    "4th line",
+    "5th line",
+  };
+  // broker port must match properties.xml
+  @Rule
+  public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort);
+  @Test
+  public void testApplication() throws IOException, Exception {
+    try {
+      // create file in monitored HDFS directory
+      createFile();
+      // run app asynchronously; terminate after results are checked
+      LocalMode.Controller lc = asyncRun();
+      // get messages from Kafka topic and compare with input
+      chkOutput();
+      lc.shutdown();
+    } catch (ConstraintViolationException e) {
+"constraint violations: " + e.getConstraintViolations());
+    }
+  }
+  // create a file with content from 'lines'
+  private void createFile() throws IOException {
+    // remove old file and create new one
+    File file = new File(directory, FILE_NAME);
+    FileUtils.deleteQuietly(file);
+    try {
+      String data = StringUtils.join(lines, "\n") + "\n";    // add final newline
+      FileUtils.writeStringToFile(file, data, "UTF-8");
+    } catch (IOException e) {
+      LOG.error("Error: Failed to create file {} in {}", FILE_NAME, directory);
+      e.printStackTrace();
+    }
+    LOG.debug("Created file {} with {} lines in {}",
+              FILE_NAME, lines.length, directory);
+  }
+  private LocalMode.Controller asyncRun() throws Exception {
+    Configuration conf = getConfig();
+    LocalMode lma = LocalMode.newInstance();
+    lma.prepareDAG(new Application(), conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.runAsync();
+    return lc;
+  }
+  private Configuration getConfig() {
+      Configuration conf = new Configuration(false);
+      conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+      conf.set("", directory);
+      return conf;
+  }
+  private void chkOutput() throws Exception {
+    KafkaUnit ku = kafkaUnitRule.getKafkaUnit();
+    List<String> messages = null;
+    // wait for messages to appear in kafka
+    Thread.sleep(10000);
+    try {
+      messages = ku.readMessages(TOPIC, lines.length);
+    } catch (Exception e) {
+      LOG.error("Error: Got exception {}", e);
+    }
+    int i = 0;
+    for (String msg : messages) {
+      assertTrue("Error: message mismatch", msg.equals(lines[i]));
+      ++i;
+    }
+  }
diff --git a/examples/hdfs2kafka/src/test/resources/ b/examples/hdfs2kafka/src/test/resources/
new file mode 100644
index 0000000..98544e8
--- /dev/null
+++ b/examples/hdfs2kafka/src/test/resources/
@@ -0,0 +1,22 @@
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
diff --git a/examples/kafka/ b/examples/kafka/
new file mode 100644
index 0000000..1cecdaa
--- /dev/null
+++ b/examples/kafka/
@@ -0,0 +1,6 @@
+This sample application show how to read lines from a Kafka topic using the new (0.9)
+Kafka input operator and write them out to HDFS using rolling files with a bounded size.
+The output files start out with a `.tmp` extension and get renamed when they reach the
+size bound.  Additional operators to perform parsing, aggregation or filtering can be
+inserted into this pipeline as needed.
diff --git a/examples/kafka/XmlJavadocCommentsExtractor.xsl b/examples/kafka/XmlJavadocCommentsExtractor.xsl
new file mode 100644
index 0000000..08075a9
--- /dev/null
+++ b/examples/kafka/XmlJavadocCommentsExtractor.xsl
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+    Document   : XmlJavadocCommentsExtractor.xsl
+    Created on : September 16, 2014, 11:30 AM
+    Description:
+        The transformation strips off all information except for comments and tags from xml javadoc generated by xml-doclet.
+<xsl:stylesheet xmlns:xsl="" version="1.0">
+  <xsl:output method="xml" standalone="yes"/>
+  <!-- copy xml by selecting only the following nodes, attributes and text -->
+  <xsl:template match="node()|text()|@*">
+    <xsl:copy>
+      <xsl:apply-templates select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/>
+    </xsl:copy>
+  </xsl:template>
+  <!-- Strip off the following paths from the selected xml -->
+  <xsl:template match="//root/package/interface/interface
+                      |//root/package/interface/method/@qualified
+                      |//root/package/class/interface
+                      |//root/package/class/class
+                      |//root/package/class/method/@qualified
+                      |//root/package/class/field/@qualified" />
+  <xsl:strip-space elements="*"/>
diff --git a/examples/kafka/pom.xml b/examples/kafka/pom.xml
new file mode 100644
index 0000000..ce325bf
--- /dev/null
+++ b/examples/kafka/pom.xml
@@ -0,0 +1,307 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="" xmlns:xsi="" xsi:schemaLocation="">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>com.example</groupId>
+  <version>1.0-SNAPSHOT</version>
+  <artifactId>kafka2hdfs</artifactId>
+  <packaging>jar</packaging>
+  <!-- change these to the appropriate values -->
+  <name>New Kafka Input Operator</name>
+  <description>Example Use of New Kafka Input Operator</description>
+  <properties>
+    <!-- change this if you desire to use a different version of Apex Core -->
+    <apex.version>3.5.0</apex.version>
+    <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
+    <malhar.version>3.6.0</malhar.version>
+  </properties>
+  <build>
+    <plugins>
+       <plugin>
+         <groupId>org.apache.maven.plugins</groupId>
+         <artifactId>maven-eclipse-plugin</artifactId>
+         <version>2.9</version>
+         <configuration>
+           <downloadSources>true</downloadSources>
+         </configuration>
+       </plugin>
+       <plugin>
+         <artifactId>maven-compiler-plugin</artifactId>
+         <version>3.3</version>
+         <configuration>
+           <encoding>UTF-8</encoding>
+           <source>1.7</source>
+           <target>1.7</target>
+           <debug>true</debug>
+           <optimize>false</optimize>
+           <showDeprecation>true</showDeprecation>
+           <showWarnings>true</showWarnings>
+         </configuration>
+       </plugin>
+       <plugin>
+         <artifactId>maven-dependency-plugin</artifactId>
+         <version>2.8</version>
+         <executions>
+           <execution>
+             <id>copy-dependencies</id>
+             <phase>prepare-package</phase>
+             <goals>
+               <goal>copy-dependencies</goal>
+             </goals>
+             <configuration>
+               <outputDirectory>target/deps</outputDirectory>
+               <includeScope>runtime</includeScope>
+             </configuration>
+           </execution>
+         </executions>
+       </plugin>
+       <plugin>
+         <artifactId>maven-assembly-plugin</artifactId>
+         <executions>
+           <execution>
+             <id>app-package-assembly</id>
+             <phase>package</phase>
+             <goals>
+               <goal>single</goal>
+             </goals>
+             <configuration>
+               <finalName>${project.artifactId}-${project.version}-apexapp</finalName>
+               <appendAssemblyId>false</appendAssemblyId>
+               <descriptors>
+                 <descriptor>src/assemble/appPackage.xml</descriptor>
+               </descriptors>
+               <archiverConfig>
+                 <defaultDirectoryMode>0755</defaultDirectoryMode>
+               </archiverConfig>                  
+               <archive>
+                 <manifestEntries>
+                   <Class-Path>${apex.apppackage.classpath}</Class-Path>
+                   <DT-Engine-Version>${apex.version}</DT-Engine-Version>
+                   <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id>
+                   <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name>
+                   <DT-App-Package-Version>${project.version}</DT-App-Package-Version>
+                   <DT-App-Package-Display-Name>${}</DT-App-Package-Display-Name>
+                   <DT-App-Package-Description>${project.description}</DT-App-Package-Description>
+                 </manifestEntries>
+               </archive>
+             </configuration>
+           </execution>
+         </executions>
+       </plugin>
+       <plugin>
+         <artifactId>maven-antrun-plugin</artifactId>
+         <version>1.7</version>
+         <executions>
+           <execution>
+             <phase>package</phase>
+             <configuration>
+               <target>
+                 <move file="${}/${project.artifactId}-${project.version}-apexapp.jar"
+                       tofile="${}/${project.artifactId}-${project.version}.apa" />
+               </target>
+             </configuration>
+             <goals>
+               <goal>run</goal>
+             </goals>
+           </execution>
+           <execution>
+             <!-- create resource directory for xml javadoc-->
+             <id>createJavadocDirectory</id>
+             <phase>generate-resources</phase>
+             <configuration>
+               <tasks>
+                 <delete dir="${}/generated-resources/xml-javadoc"/>
+                 <mkdir dir="${}/generated-resources/xml-javadoc"/>
+               </tasks>
+             </configuration>
+             <goals>
+               <goal>run</goal>
+             </goals>
+           </execution>
+         </executions>
+       </plugin>
+       <plugin>
+         <groupId>org.codehaus.mojo</groupId>
+         <artifactId>build-helper-maven-plugin</artifactId>
+         <version>1.9.1</version>
+         <executions>
+           <execution>
+             <id>attach-artifacts</id>
+             <phase>package</phase>
+             <goals>
+               <goal>attach-artifact</goal>
+             </goals>
+             <configuration>
+               <artifacts>
+                 <artifact>
+                   <file>target/${project.artifactId}-${project.version}.apa</file>
+                   <type>apa</type>
+                 </artifact>
+               </artifacts>
+               <skipAttach>false</skipAttach>
+             </configuration>
+           </execution>
+         </executions>
+       </plugin>
+      <!-- generate javdoc -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-javadoc-plugin</artifactId>
+        <executions>
+          <!-- generate xml javadoc -->
+          <execution>
+            <id>xml-doclet</id>
+            <phase>generate-resources</phase>
+            <goals>
+              <goal>javadoc</goal>
+            </goals>
+            <configuration>
+              <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet>
+              <additionalparam>-d ${}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam>
+              <useStandardDocletOptions>false</useStandardDocletOptions>
+              <docletArtifact>
+                <groupId>com.github.markusbernhardt</groupId>
+                <artifactId>xml-doclet</artifactId>
+                <version>1.0.4</version>
+              </docletArtifact>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags-->
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>xml-maven-plugin</artifactId>
+        <version>1.0</version>
+        <executions>
+          <execution>
+            <id>transform-xmljavadoc</id>
+            <phase>generate-resources</phase>
+            <goals>
+              <goal>transform</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <transformationSets>
+            <transformationSet>
+              <dir>${}/generated-resources/xml-javadoc</dir>
+              <includes>
+                <include>${project.artifactId}-${project.version}-javadoc.xml</include>
+              </includes>
+              <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet>
+              <outputDir>${}/generated-resources/xml-javadoc</outputDir>
+            </transformationSet>
+          </transformationSets>
+        </configuration>
+      </plugin>
+      <!-- copy xml javadoc to class jar -->
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+        <version>2.6</version>
+        <executions>
+          <execution>
+            <id>copy-resources</id>
+            <phase>process-resources</phase>
+            <goals>
+              <goal>copy-resources</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${basedir}/target/classes</outputDirectory>
+              <resources>
+                <resource>
+                  <directory>${}/generated-resources/xml-javadoc</directory>
+                  <includes>
+                    <include>${project.artifactId}-${project.version}-javadoc.xml</include>
+                  </includes>
+                  <filtering>true</filtering>
+                </resource>
+              </resources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+    <!-- add your dependencies here -->
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-kafka</artifactId>
+      <version>${malhar.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.kafka</groupId>
+          <artifactId>kafka-clients</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-library</artifactId>
+      <version>${malhar.version}</version>
+      <!--
+           If you know that your application does not need transitive dependencies pulled in by malhar-library,
+           uncomment the following to reduce the size of your app package.
+      -->
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>apex-common</artifactId>
+      <version>${apex.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.10</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>apex-engine</artifactId>
+      <version>${apex.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.11</artifactId>
+      <version></version>
+    </dependency>
+    <dependency>
+      <groupId>info.batey.kafka</groupId>
+      <artifactId>kafka-unit</artifactId>
+      <version>0.4</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
diff --git a/examples/kafka/src/assemble/appPackage.xml b/examples/kafka/src/assemble/appPackage.xml
new file mode 100644
index 0000000..7ad071c
--- /dev/null
+++ b/examples/kafka/src/assemble/appPackage.xml
@@ -0,0 +1,43 @@
+<assembly xmlns=""
+    xmlns:xsi=""
+    xsi:schemaLocation="">
+  <id>appPackage</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/</directory>
+      <outputDirectory>/app</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/target/deps</directory>
+      <outputDirectory>/lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/site/conf</directory>
+      <outputDirectory>/conf</outputDirectory>
+      <includes>
+        <include>*.xml</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/META-INF</directory>
+      <outputDirectory>/META-INF</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/app</directory>
+      <outputDirectory>/app</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/resources</directory>
+      <outputDirectory>/resources</outputDirectory>
+    </fileSet>
+  </fileSets>
diff --git a/examples/kafka/src/main/java/com/example/myapexapp/ b/examples/kafka/src/main/java/com/example/myapexapp/
new file mode 100644
index 0000000..09089eb
--- /dev/null
+++ b/examples/kafka/src/main/java/com/example/myapexapp/
@@ -0,0 +1,26 @@
+package com.example.myapexapp;
+import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
+import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+public class KafkaApp implements StreamingApplication
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    KafkaSinglePortInputOperator in
+      = dag.addOperator("kafkaIn", new KafkaSinglePortInputOperator());
+    in.setInitialOffset(;
+    LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator());
+    dag.addStream("data", in.outputPort, out.input);
+  }
diff --git a/examples/kafka/src/main/java/com/example/myapexapp/ b/examples/kafka/src/main/java/com/example/myapexapp/
new file mode 100644
index 0000000..2b184c6
--- /dev/null
+++ b/examples/kafka/src/main/java/com/example/myapexapp/
@@ -0,0 +1,34 @@
+package com.example.myapexapp;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import javax.validation.constraints.NotNull;
+ * Converts each tuple to a string and writes it as a new line to the output file
+ */
+public class LineOutputOperator extends AbstractFileOutputOperator<byte[]>
+  private static final String NL = System.lineSeparator();
+  private static final Charset CS = StandardCharsets.UTF_8;
+  @NotNull
+  private String baseName;
+  @Override
+  public byte[] getBytesForTuple(byte[] t) {
+    String result = new String(t, CS) + NL;
+    return result.getBytes(CS);
+ }
+  @Override
+  protected String getFileName(byte[] tuple) {
+    return baseName;
+  }
+  public String getBaseName() { return baseName; }
+  public void setBaseName(String v) { baseName = v; }
diff --git a/examples/kafka/src/main/resources/META-INF/properties.xml b/examples/kafka/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..a896168
--- /dev/null
+++ b/examples/kafka/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0"?>
+  <!-- 
+  <property>
+    <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name>
+    <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value>
+  </property>
+  -->
+  <!-- memory assigned to app master
+  <property>
+    <name>dt.attr.MASTER_MEMORY_MB</name>
+    <value>2048</value>
+  </property>
+  -->
+  <!-- kafka input operator (0.9) -->
+  <property>
+    <name>dt.application.Kafka2HDFS.operator.kafkaIn.prop.initialPartitionCount</name>
+    <value>1</value>
+  </property>
+  <property>
+    <name>dt.application.Kafka2HDFS.operator.kafkaIn.prop.topics</name>
+    <value>kafka2hdfs</value>
+  </property>
+  <property>
+    <name>dt.application.Kafka2HDFS.operator.kafkaIn.prop.clusters</name>
+    <value>localhost:9092</value>  <!-- broker (NOT zookeeper) address -->
+  </property>
+  <!-- file output operator -->
+  <property>
+    <name>dt.application.Kafka2HDFS.operator.fileOut.prop.filePath</name>
+    <value>/tmp/FromKafka</value>
+  </property>
+  <property>
+    <name>dt.application.Kafka2HDFS.operator.fileOut.prop.baseName</name>
+    <value>test</value>
+  </property>
+  <property>
+    <name>dt.application.Kafka2HDFS.operator.fileOut.prop.maxLength</name>
+    <value>1024</value>
+  </property>
+  <property>
+    <name>dt.application.Kafka2HDFS.operator.fileOut.prop.rotationWindows</name>
+    <value>4</value>
+  </property>
diff --git a/examples/kafka/src/test/java/com/example/myapexapp/ b/examples/kafka/src/test/java/com/example/myapexapp/
new file mode 100644
index 0000000..635d25a
--- /dev/null
+++ b/examples/kafka/src/test/java/com/example/myapexapp/
@@ -0,0 +1,152 @@
+ * Put your copyright and license info here.
+ */
+package com.example.myapexapp;
+import java.util.ArrayList;
+import javax.validation.ConstraintViolationException;
+import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import info.batey.kafka.unit.KafkaUnitRule;
+import info.batey.kafka.unit.KafkaUnit;
+import kafka.producer.KeyedMessage;
+import com.datatorrent.api.LocalMode;
+import static org.junit.Assert.assertTrue;
+ * Test the DAG declaration in local mode.
+ */
+public class ApplicationTest {
+  private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
+  private static final String TOPIC = "kafka2hdfs";
+  private static final int zkPort = 2181;
+  private static final int  brokerPort = 9092;
+  private static final String BROKER = "localhost:" + brokerPort;
+  private static final String FILE_NAME = "test";
+  private static final String FILE_DIR  = "/tmp/FromKafka";
+  private static final String FILE_PATH = FILE_DIR + "/" + FILE_NAME + ".0";     // first part
+  // test messages
+  private static String[] lines =
+  {
+    "1st line",
+    "2nd line",
+    "3rd line",
+    "4th line",
+    "5th line",
+  };
+  // broker port must match properties.xml
+  @Rule
+  public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort);
+  @Test
+  public void testApplication() throws Exception {
+    try {
+      // delete output file if it exists
+      File file = new File(FILE_PATH);
+      file.delete();
+      // write messages to Kafka topic
+      writeToTopic();
+      // run app asynchronously; terminate after results are checked
+      LocalMode.Controller lc = asyncRun();
+      // check for presence of output file
+      chkOutput();
+      // compare output lines to input
+      compare();
+      lc.shutdown();
+    } catch (ConstraintViolationException e) {
+"constraint violations: " + e.getConstraintViolations());
+    }
+  }
+  private void writeToTopic() {
+    KafkaUnit ku = kafkaUnitRule.getKafkaUnit();
+    ku.createTopic(TOPIC);
+    for (String line : lines) {
+      KeyedMessage<String, String> kMsg = new KeyedMessage<>(TOPIC, line);
+      ku.sendMessages(kMsg);
+    }
+    LOG.debug("Sent messages to topic {}", TOPIC);
+  }
+  private Configuration getConfig() {
+      Configuration conf = new Configuration(false);
+      String pre = "dt.operator.kafkaIn.prop.";
+      conf.setEnum(pre + "initialOffset",
+                   AbstractKafkaInputOperator.InitialOffset.EARLIEST);
+      conf.setInt(pre + "initialPartitionCount", 1);
+      conf.set(   pre + "topics",                TOPIC);
+      conf.set(   pre + "clusters",              BROKER);
+      pre = "dt.operator.fileOut.prop.";
+      conf.set(   pre + "filePath",        FILE_DIR);
+      conf.set(   pre + "baseName",        FILE_NAME);
+      conf.setInt(pre + "maxLength",       40);
+      conf.setInt(pre + "rotationWindows", 3);
+      return conf;
+  }
+  private LocalMode.Controller asyncRun() throws Exception {
+    Configuration conf = getConfig();
+    LocalMode lma = LocalMode.newInstance();
+    lma.prepareDAG(new KafkaApp(), conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.runAsync();
+    return lc;
+  }
+  private static void chkOutput() throws Exception {
+    File file = new File(FILE_PATH);
+    final int MAX = 60;
+    for (int i = 0; i < MAX && (! file.exists()); ++i ) {
+      LOG.debug("Sleeping, i = {}", i);
+      Thread.sleep(1000);
+    }
+    if (! file.exists()) {
+      String msg = String.format("Error: %s not found after %d seconds%n", FILE_PATH, MAX);
+      throw new RuntimeException(msg);
+    }
+  }
+  private static void compare() throws Exception {
+    // read output file
+    File file = new File(FILE_PATH);
+    BufferedReader br = new BufferedReader(new FileReader(file));
+    ArrayList<String> list = new ArrayList<>();
+    String line;
+    while (null != (line = br.readLine())) {
+      list.add(line);
+    }
+    br.close();
+    // compare
+    Assert.assertEquals("number of lines", list.size(), lines.length);
+    for (int i = 0; i < lines.length; ++i) {
+      assertTrue("line", lines[i].equals(list.get(i)));
+    }
+  }
diff --git a/examples/kafka/src/test/resources/ b/examples/kafka/src/test/resources/
new file mode 100644
index 0000000..3bfcdc5
--- /dev/null
+++ b/examples/kafka/src/test/resources/
@@ -0,0 +1,21 @@
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n