You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/04/24 16:16:04 UTC
[2/3] apex-malhar git commit: Grouping examples and POM and Readme
changes.
Grouping examples and POM and Readme changes.
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c79def4c
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c79def4c
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c79def4c
Branch: refs/heads/master
Commit: c79def4cd6f2bfb830aa32c9a9e455a7c4eb8385
Parents: 24027ed
Author: Lakshmi Prasanna Velineni <la...@datatorrent.com>
Authored: Wed Mar 29 12:46:59 2017 -0700
Committer: Lakshmi Prasanna Velineni <la...@datatorrent.com>
Committed: Mon Apr 24 09:06:40 2017 -0700
----------------------------------------------------------------------
examples/hdfs2kafka/README.md | 4 -
.../hdfs2kafka/XmlJavadocCommentsExtractor.xsl | 44 ---
examples/hdfs2kafka/pom.xml | 315 -------------------
examples/hdfs2kafka/src/assemble/appPackage.xml | 43 ---
.../java/com/example/myapexapp/Application.java | 26 --
.../src/main/resources/META-INF/properties.xml | 16 -
.../com/example/myapexapp/ApplicationTest.java | 132 --------
.../src/test/resources/log4j.properties | 22 --
examples/kafka/README.md | 10 +
examples/kafka/XmlJavadocCommentsExtractor.xsl | 44 ---
examples/kafka/pom.xml | 305 ++----------------
.../java/com/example/myapexapp/KafkaApp.java | 26 --
.../example/myapexapp/LineOutputOperator.java | 34 --
.../examples/kafka/hdfs2kafka/Application.java | 26 ++
.../examples/kafka/kafka2hdfs/KafkaApp.java | 26 ++
.../kafka/kafka2hdfs/LineOutputOperator.java | 34 ++
.../META-INF/properties-hdfs2kafka.xml | 16 +
.../com/example/myapexapp/ApplicationTest.java | 152 ---------
.../kafka/hdfs2kafka/ApplicationTest.java | 125 ++++++++
.../kafka/kafka2hdfs/ApplicationTest.java | 150 +++++++++
examples/pom.xml | 1 +
pom.xml | 2 +-
22 files changed, 420 insertions(+), 1133 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/hdfs2kafka/README.md
----------------------------------------------------------------------
diff --git a/examples/hdfs2kafka/README.md b/examples/hdfs2kafka/README.md
deleted file mode 100644
index 166abd3..0000000
--- a/examples/hdfs2kafka/README.md
+++ /dev/null
@@ -1,4 +0,0 @@
-This sample application shows how to read lines from files in HDFS and write
-them out to a Kafka topic. Each line of the input file is considered a separate
-message. The topic name, the name of the directory that is monitored for input
-files, and other parameters are configurable in `META_INF/properties.xml`.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/hdfs2kafka/XmlJavadocCommentsExtractor.xsl
----------------------------------------------------------------------
diff --git a/examples/hdfs2kafka/XmlJavadocCommentsExtractor.xsl b/examples/hdfs2kafka/XmlJavadocCommentsExtractor.xsl
deleted file mode 100644
index 08075a9..0000000
--- a/examples/hdfs2kafka/XmlJavadocCommentsExtractor.xsl
+++ /dev/null
@@ -1,44 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-
--->
-
-<!--
- Document : XmlJavadocCommentsExtractor.xsl
- Created on : September 16, 2014, 11:30 AM
- Description:
- The transformation strips off all information except for comments and tags from xml javadoc generated by xml-doclet.
--->
-
-<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0">
- <xsl:output method="xml" standalone="yes"/>
-
- <!-- copy xml by selecting only the following nodes, attributes and text -->
- <xsl:template match="node()|text()|@*">
- <xsl:copy>
- <xsl:apply-templates select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/>
- </xsl:copy>
- </xsl:template>
-
- <!-- Strip off the following paths from the selected xml -->
- <xsl:template match="//root/package/interface/interface
- |//root/package/interface/method/@qualified
- |//root/package/class/interface
- |//root/package/class/class
- |//root/package/class/method/@qualified
- |//root/package/class/field/@qualified" />
-
- <xsl:strip-space elements="*"/>
-</xsl:stylesheet>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/hdfs2kafka/pom.xml
----------------------------------------------------------------------
diff --git a/examples/hdfs2kafka/pom.xml b/examples/hdfs2kafka/pom.xml
deleted file mode 100644
index 75cfb6d..0000000
--- a/examples/hdfs2kafka/pom.xml
+++ /dev/null
@@ -1,315 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>com.example</groupId>
- <version>1.0-SNAPSHOT</version>
- <artifactId>hdfsToKafka</artifactId>
- <packaging>jar</packaging>
-
- <!-- change these to the appropriate values -->
- <name>HDFS to Kafka</name>
- <description>Simple application to transfer data from HDFS to Kafka</description>
-
- <properties>
- <!-- change this if you desire to use a different version of Apex Core -->
- <apex.version>3.5.0</apex.version>
- <malhar.version>3.6.0</malhar.version>
- <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
- </properties>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-eclipse-plugin</artifactId>
- <version>2.9</version>
- <configuration>
- <downloadSources>true</downloadSources>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.3</version>
- <configuration>
- <encoding>UTF-8</encoding>
- <source>1.7</source>
- <target>1.7</target>
- <debug>true</debug>
- <optimize>false</optimize>
- <showDeprecation>true</showDeprecation>
- <showWarnings>true</showWarnings>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-dependency-plugin</artifactId>
- <version>2.8</version>
- <executions>
- <execution>
- <id>copy-dependencies</id>
- <phase>prepare-package</phase>
- <goals>
- <goal>copy-dependencies</goal>
- </goals>
- <configuration>
- <outputDirectory>target/deps</outputDirectory>
- <includeScope>runtime</includeScope>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <executions>
- <execution>
- <id>app-package-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- <configuration>
- <finalName>${project.artifactId}-${project.version}-apexapp</finalName>
- <appendAssemblyId>false</appendAssemblyId>
- <descriptors>
- <descriptor>src/assemble/appPackage.xml</descriptor>
- </descriptors>
- <archiverConfig>
- <defaultDirectoryMode>0755</defaultDirectoryMode>
- </archiverConfig>
- <archive>
- <manifestEntries>
- <Class-Path>${apex.apppackage.classpath}</Class-Path>
- <DT-Engine-Version>${apex.version}</DT-Engine-Version>
- <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id>
- <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name>
- <DT-App-Package-Version>${project.version}</DT-App-Package-Version>
- <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name>
- <DT-App-Package-Description>${project.description}</DT-App-Package-Description>
- </manifestEntries>
- </archive>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <artifactId>maven-antrun-plugin</artifactId>
- <version>1.7</version>
- <executions>
- <execution>
- <phase>package</phase>
- <configuration>
- <target>
- <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar"
- tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" />
- </target>
- </configuration>
- <goals>
- <goal>run</goal>
- </goals>
- </execution>
- <execution>
- <!-- create resource directory for xml javadoc-->
- <id>createJavadocDirectory</id>
- <phase>generate-resources</phase>
- <configuration>
- <tasks>
- <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/>
- <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/>
- </tasks>
- </configuration>
- <goals>
- <goal>run</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.9.1</version>
- <executions>
- <execution>
- <id>attach-artifacts</id>
- <phase>package</phase>
- <goals>
- <goal>attach-artifact</goal>
- </goals>
- <configuration>
- <artifacts>
- <artifact>
- <file>target/${project.artifactId}-${project.version}.apa</file>
- <type>apa</type>
- </artifact>
- </artifacts>
- <skipAttach>false</skipAttach>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <!-- generate javdoc -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <executions>
- <!-- generate xml javadoc -->
- <execution>
- <id>xml-doclet</id>
- <phase>generate-resources</phase>
- <goals>
- <goal>javadoc</goal>
- </goals>
- <configuration>
- <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet>
- <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam>
- <useStandardDocletOptions>false</useStandardDocletOptions>
- <docletArtifact>
- <groupId>com.github.markusbernhardt</groupId>
- <artifactId>xml-doclet</artifactId>
- <version>1.0.4</version>
- </docletArtifact>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags-->
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>xml-maven-plugin</artifactId>
- <version>1.0</version>
- <executions>
- <execution>
- <id>transform-xmljavadoc</id>
- <phase>generate-resources</phase>
- <goals>
- <goal>transform</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <transformationSets>
- <transformationSet>
- <dir>${project.build.directory}/generated-resources/xml-javadoc</dir>
- <includes>
- <include>${project.artifactId}-${project.version}-javadoc.xml</include>
- </includes>
- <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet>
- <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir>
- </transformationSet>
- </transformationSets>
- </configuration>
- </plugin>
- <!-- copy xml javadoc to class jar -->
- <plugin>
- <artifactId>maven-resources-plugin</artifactId>
- <version>2.6</version>
- <executions>
- <execution>
- <id>copy-resources</id>
- <phase>process-resources</phase>
- <goals>
- <goal>copy-resources</goal>
- </goals>
- <configuration>
- <outputDirectory>${basedir}/target/classes</outputDirectory>
- <resources>
- <resource>
- <directory>${project.build.directory}/generated-resources/xml-javadoc</directory>
- <includes>
- <include>${project.artifactId}-${project.version}-javadoc.xml</include>
- </includes>
- <filtering>true</filtering>
- </resource>
- </resources>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- </plugins>
-
- </build>
-
- <dependencies>
- <!-- add your dependencies here -->
- <dependency>
- <groupId>org.apache.apex</groupId>
- <artifactId>malhar-library</artifactId>
- <version>3.4.0</version>
- <!--
- If you know that your application does not need transitive dependencies pulled in by malhar-library,
- uncomment the following to reduce the size of your app package.
- -->
- <!--
- <exclusions>
- <exclusion>
- <groupId>*</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
- -->
- </dependency>
- <dependency>
- <groupId>org.apache.apex</groupId>
- <artifactId>malhar-contrib</artifactId>
- <version>3.4.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.apex</groupId>
- <artifactId>apex-common</artifactId>
- <version>${apex.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.10</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.apex</groupId>
- <artifactId>apex-engine</artifactId>
- <version>${apex.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.8.2</artifactId>
- <version>0.8.1</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-simple</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>info.batey.kafka</groupId>
- <artifactId>kafka-unit</artifactId>
- <version>0.3</version>
- <exclusions>
- <exclusion>
- <groupId>*</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- </dependencies>
-
-</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/hdfs2kafka/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/examples/hdfs2kafka/src/assemble/appPackage.xml b/examples/hdfs2kafka/src/assemble/appPackage.xml
deleted file mode 100644
index 7ad071c..0000000
--- a/examples/hdfs2kafka/src/assemble/appPackage.xml
+++ /dev/null
@@ -1,43 +0,0 @@
-<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
- <id>appPackage</id>
- <formats>
- <format>jar</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
- <fileSets>
- <fileSet>
- <directory>${basedir}/target/</directory>
- <outputDirectory>/app</outputDirectory>
- <includes>
- <include>${project.artifactId}-${project.version}.jar</include>
- </includes>
- </fileSet>
- <fileSet>
- <directory>${basedir}/target/deps</directory>
- <outputDirectory>/lib</outputDirectory>
- </fileSet>
- <fileSet>
- <directory>${basedir}/src/site/conf</directory>
- <outputDirectory>/conf</outputDirectory>
- <includes>
- <include>*.xml</include>
- </includes>
- </fileSet>
- <fileSet>
- <directory>${basedir}/src/main/resources/META-INF</directory>
- <outputDirectory>/META-INF</outputDirectory>
- </fileSet>
- <fileSet>
- <directory>${basedir}/src/main/resources/app</directory>
- <outputDirectory>/app</outputDirectory>
- </fileSet>
- <fileSet>
- <directory>${basedir}/src/main/resources/resources</directory>
- <outputDirectory>/resources</outputDirectory>
- </fileSet>
- </fileSets>
-
-</assembly>
-
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/hdfs2kafka/src/main/java/com/example/myapexapp/Application.java
----------------------------------------------------------------------
diff --git a/examples/hdfs2kafka/src/main/java/com/example/myapexapp/Application.java b/examples/hdfs2kafka/src/main/java/com/example/myapexapp/Application.java
deleted file mode 100644
index 447ae1c..0000000
--- a/examples/hdfs2kafka/src/main/java/com/example/myapexapp/Application.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package com.example.myapexapp;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.contrib.kafka.KafkaSinglePortOutputOperator;
-import org.apache.apex.malhar.lib.fs.LineByLineFileInputOperator;
-
-@ApplicationAnnotation(name="Hdfs2Kafka")
-public class Application implements StreamingApplication
-{
-
- @Override
- public void populateDAG(DAG dag, Configuration conf)
- {
- LineByLineFileInputOperator in = dag.addOperator("lines",
- LineByLineFileInputOperator.class);
-
- KafkaSinglePortOutputOperator<String,String> out = dag.addOperator("kafkaOutput", new KafkaSinglePortOutputOperator<String,String>());
-
- dag.addStream("data", in.output, out.inputPort).setLocality(Locality.CONTAINER_LOCAL);
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/hdfs2kafka/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/hdfs2kafka/src/main/resources/META-INF/properties.xml b/examples/hdfs2kafka/src/main/resources/META-INF/properties.xml
deleted file mode 100644
index 7c624ca..0000000
--- a/examples/hdfs2kafka/src/main/resources/META-INF/properties.xml
+++ /dev/null
@@ -1,16 +0,0 @@
-<?xml version="1.0"?>
-<configuration>
- <property>
- <name>dt.operator.kafkaOutput.prop.topic</name>
- <value>hdfs2kafka</value>
- </property>
- <property>
- <name>dt.operator.lines.prop.directory</name>
- <value>/tmp/hdfs2kafka</value>
- </property>
- <property>
- <name>dt.operator.kafkaOutput.prop.producerProperties</name>
- <value>serializer.class=kafka.serializer.StringEncoder,producer.type=async,metadata.broker.list=localhost:9092</value>
- </property>
-</configuration>
-
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/hdfs2kafka/src/test/java/com/example/myapexapp/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/hdfs2kafka/src/test/java/com/example/myapexapp/ApplicationTest.java b/examples/hdfs2kafka/src/test/java/com/example/myapexapp/ApplicationTest.java
deleted file mode 100644
index 2c415be..0000000
--- a/examples/hdfs2kafka/src/test/java/com/example/myapexapp/ApplicationTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-package com.example.myapexapp;
-
-import java.io.File;
-import java.io.IOException;
-
-import java.util.List;
-import java.util.concurrent.TimeoutException;
-
-import javax.validation.ConstraintViolationException;
-
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-
-import org.junit.Test;
-
-import info.batey.kafka.unit.KafkaUnitRule;
-import info.batey.kafka.unit.KafkaUnit;
-
-import kafka.producer.KeyedMessage;
-
-import com.datatorrent.api.LocalMode;
-import com.example.myapexapp.Application;
-
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test the DAG declaration in local mode.
- */
-public class ApplicationTest {
- private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
- private static final String TOPIC = "hdfs2kafka";
- private static final String directory = "target/hdfs2kafka";
- private static final String FILE_NAME = "messages.txt";
-
- private static final int zkPort = 2181;
- private static final int brokerPort = 9092;
- private static final String BROKER = "localhost:" + brokerPort;
- //private static final String FILE_PATH = FILE_DIR + "/" + FILE_NAME + ".0"; // first part
-
- // test messages
- private static String[] lines =
- {
- "1st line",
- "2nd line",
- "3rd line",
- "4th line",
- "5th line",
- };
-
- // broker port must match properties.xml
- @Rule
- public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort);
-
-
- @Test
- public void testApplication() throws IOException, Exception {
- try {
- // create file in monitored HDFS directory
- createFile();
-
- // run app asynchronously; terminate after results are checked
- LocalMode.Controller lc = asyncRun();
-
- // get messages from Kafka topic and compare with input
- chkOutput();
-
- lc.shutdown();
- } catch (ConstraintViolationException e) {
- Assert.fail("constraint violations: " + e.getConstraintViolations());
- }
- }
-
- // create a file with content from 'lines'
- private void createFile() throws IOException {
- // remove old file and create new one
- File file = new File(directory, FILE_NAME);
- FileUtils.deleteQuietly(file);
- try {
- String data = StringUtils.join(lines, "\n") + "\n"; // add final newline
- FileUtils.writeStringToFile(file, data, "UTF-8");
- } catch (IOException e) {
- LOG.error("Error: Failed to create file {} in {}", FILE_NAME, directory);
- e.printStackTrace();
- }
- LOG.debug("Created file {} with {} lines in {}",
- FILE_NAME, lines.length, directory);
- }
-
- private LocalMode.Controller asyncRun() throws Exception {
- Configuration conf = getConfig();
- LocalMode lma = LocalMode.newInstance();
- lma.prepareDAG(new Application(), conf);
- LocalMode.Controller lc = lma.getController();
- lc.runAsync();
- return lc;
- }
-
- private Configuration getConfig() {
- Configuration conf = new Configuration(false);
- conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
- conf.set("dt.operator.lines.prop.directory", directory);
- return conf;
- }
-
- private void chkOutput() throws Exception {
- KafkaUnit ku = kafkaUnitRule.getKafkaUnit();
- List<String> messages = null;
-
- // wait for messages to appear in kafka
- Thread.sleep(10000);
-
- try {
- messages = ku.readMessages(TOPIC, lines.length);
- } catch (Exception e) {
- LOG.error("Error: Got exception {}", e);
- }
-
- int i = 0;
- for (String msg : messages) {
- assertTrue("Error: message mismatch", msg.equals(lines[i]));
- ++i;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/hdfs2kafka/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/hdfs2kafka/src/test/resources/log4j.properties b/examples/hdfs2kafka/src/test/resources/log4j.properties
deleted file mode 100644
index 98544e8..0000000
--- a/examples/hdfs2kafka/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,22 +0,0 @@
-log4j.rootLogger=DEBUG,CONSOLE
-
-log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
-log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
-log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
-
-log4j.appender.RFA=org.apache.log4j.RollingFileAppender
-log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
-log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
-log4j.appender.RFA.File=/tmp/app.log
-
-# to enable, add SYSLOG to rootLogger
-log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
-log4j.appender.SYSLOG.syslogHost=127.0.0.1
-log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
-log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
-log4j.appender.SYSLOG.Facility=LOCAL1
-
-#log4j.logger.org.apache.commons.beanutils=warn
-log4j.logger.com.datatorrent=debug
-log4j.logger.org.apache.apex=debug
-log4j.logger.org=info
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/README.md
----------------------------------------------------------------------
diff --git a/examples/kafka/README.md b/examples/kafka/README.md
index 1cecdaa..1a7a9c4 100644
--- a/examples/kafka/README.md
+++ b/examples/kafka/README.md
@@ -1,6 +1,16 @@
+## Kafka to HDFS example :
+
This sample application show how to read lines from a Kafka topic using the new (0.9)
Kafka input operator and write them out to HDFS using rolling files with a bounded size.
The output files start out with a `.tmp` extension and get renamed when they reach the
size bound. Additional operators to perform parsing, aggregation or filtering can be
inserted into this pipeline as needed.
+
+## HDFS to Kafka example :
+
+This sample application shows how to read lines from files in HDFS and write
+them out to a Kafka topic. Each line of the input file is considered a separate
+message. The topic name, the name of the directory that is monitored for input
+files, and other parameters are configurable in `META_INF/properties-hdfs2kafka.xml`.
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/XmlJavadocCommentsExtractor.xsl
----------------------------------------------------------------------
diff --git a/examples/kafka/XmlJavadocCommentsExtractor.xsl b/examples/kafka/XmlJavadocCommentsExtractor.xsl
deleted file mode 100644
index 08075a9..0000000
--- a/examples/kafka/XmlJavadocCommentsExtractor.xsl
+++ /dev/null
@@ -1,44 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-
--->
-
-<!--
- Document : XmlJavadocCommentsExtractor.xsl
- Created on : September 16, 2014, 11:30 AM
- Description:
- The transformation strips off all information except for comments and tags from xml javadoc generated by xml-doclet.
--->
-
-<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0">
- <xsl:output method="xml" standalone="yes"/>
-
- <!-- copy xml by selecting only the following nodes, attributes and text -->
- <xsl:template match="node()|text()|@*">
- <xsl:copy>
- <xsl:apply-templates select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/>
- </xsl:copy>
- </xsl:template>
-
- <!-- Strip off the following paths from the selected xml -->
- <xsl:template match="//root/package/interface/interface
- |//root/package/interface/method/@qualified
- |//root/package/class/interface
- |//root/package/class/class
- |//root/package/class/method/@qualified
- |//root/package/class/field/@qualified" />
-
- <xsl:strip-space elements="*"/>
-</xsl:stylesheet>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/examples/kafka/pom.xml b/examples/kafka/pom.xml
index ce325bf..c21bb20 100644
--- a/examples/kafka/pom.xml
+++ b/examples/kafka/pom.xml
@@ -2,258 +2,39 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
- <groupId>com.example</groupId>
- <version>1.0-SNAPSHOT</version>
- <artifactId>kafka2hdfs</artifactId>
- <packaging>jar</packaging>
-
- <!-- change these to the appropriate values -->
- <name>New Kafka Input Operator</name>
- <description>Example Use of New Kafka Input Operator</description>
-
- <properties>
- <!-- change this if you desire to use a different version of Apex Core -->
- <apex.version>3.5.0</apex.version>
- <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
- <malhar.version>3.6.0</malhar.version>
- </properties>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-eclipse-plugin</artifactId>
- <version>2.9</version>
- <configuration>
- <downloadSources>true</downloadSources>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.3</version>
- <configuration>
- <encoding>UTF-8</encoding>
- <source>1.7</source>
- <target>1.7</target>
- <debug>true</debug>
- <optimize>false</optimize>
- <showDeprecation>true</showDeprecation>
- <showWarnings>true</showWarnings>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-dependency-plugin</artifactId>
- <version>2.8</version>
- <executions>
- <execution>
- <id>copy-dependencies</id>
- <phase>prepare-package</phase>
- <goals>
- <goal>copy-dependencies</goal>
- </goals>
- <configuration>
- <outputDirectory>target/deps</outputDirectory>
- <includeScope>runtime</includeScope>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <executions>
- <execution>
- <id>app-package-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- <configuration>
- <finalName>${project.artifactId}-${project.version}-apexapp</finalName>
- <appendAssemblyId>false</appendAssemblyId>
- <descriptors>
- <descriptor>src/assemble/appPackage.xml</descriptor>
- </descriptors>
- <archiverConfig>
- <defaultDirectoryMode>0755</defaultDirectoryMode>
- </archiverConfig>
- <archive>
- <manifestEntries>
- <Class-Path>${apex.apppackage.classpath}</Class-Path>
- <DT-Engine-Version>${apex.version}</DT-Engine-Version>
- <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id>
- <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name>
- <DT-App-Package-Version>${project.version}</DT-App-Package-Version>
- <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name>
- <DT-App-Package-Description>${project.description}</DT-App-Package-Description>
- </manifestEntries>
- </archive>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <artifactId>maven-antrun-plugin</artifactId>
- <version>1.7</version>
- <executions>
- <execution>
- <phase>package</phase>
- <configuration>
- <target>
- <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar"
- tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" />
- </target>
- </configuration>
- <goals>
- <goal>run</goal>
- </goals>
- </execution>
- <execution>
- <!-- create resource directory for xml javadoc-->
- <id>createJavadocDirectory</id>
- <phase>generate-resources</phase>
- <configuration>
- <tasks>
- <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/>
- <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/>
- </tasks>
- </configuration>
- <goals>
- <goal>run</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.9.1</version>
- <executions>
- <execution>
- <id>attach-artifacts</id>
- <phase>package</phase>
- <goals>
- <goal>attach-artifact</goal>
- </goals>
- <configuration>
- <artifacts>
- <artifact>
- <file>target/${project.artifactId}-${project.version}.apa</file>
- <type>apa</type>
- </artifact>
- </artifacts>
- <skipAttach>false</skipAttach>
- </configuration>
- </execution>
- </executions>
- </plugin>
+ <parent>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-examples</artifactId>
+ <version>3.8.0-SNAPSHOT</version>
+ </parent>
- <!-- generate javdoc -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <executions>
- <!-- generate xml javadoc -->
- <execution>
- <id>xml-doclet</id>
- <phase>generate-resources</phase>
- <goals>
- <goal>javadoc</goal>
- </goals>
- <configuration>
- <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet>
- <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam>
- <useStandardDocletOptions>false</useStandardDocletOptions>
- <docletArtifact>
- <groupId>com.github.markusbernhardt</groupId>
- <artifactId>xml-doclet</artifactId>
- <version>1.0.4</version>
- </docletArtifact>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags-->
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>xml-maven-plugin</artifactId>
- <version>1.0</version>
- <executions>
- <execution>
- <id>transform-xmljavadoc</id>
- <phase>generate-resources</phase>
- <goals>
- <goal>transform</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <transformationSets>
- <transformationSet>
- <dir>${project.build.directory}/generated-resources/xml-javadoc</dir>
- <includes>
- <include>${project.artifactId}-${project.version}-javadoc.xml</include>
- </includes>
- <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet>
- <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir>
- </transformationSet>
- </transformationSets>
- </configuration>
- </plugin>
- <!-- copy xml javadoc to class jar -->
- <plugin>
- <artifactId>maven-resources-plugin</artifactId>
- <version>2.6</version>
- <executions>
- <execution>
- <id>copy-resources</id>
- <phase>process-resources</phase>
- <goals>
- <goal>copy-resources</goal>
- </goals>
- <configuration>
- <outputDirectory>${basedir}/target/classes</outputDirectory>
- <resources>
- <resource>
- <directory>${project.build.directory}/generated-resources/xml-javadoc</directory>
- <includes>
- <include>${project.artifactId}-${project.version}-javadoc.xml</include>
- </includes>
- <filtering>true</filtering>
- </resource>
- </resources>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- </plugins>
-
- </build>
+ <artifactId>malhar-examples-kafka</artifactId>
+ <packaging>jar</packaging>
+ <name>Apache Apex Malhar Kafka examples</name>
+ <description>
+ kafka2hdfs is a example show how to read lines from a Kafka topic using the new (0.9)
+ Kafka input operator and write them out to HDFS.
+ hdfs2kafka is a simple application to transfer data from HDFS to Kafka
+ </description>
+
<dependencies>
<!-- add your dependencies here -->
<dependency>
<groupId>org.apache.apex</groupId>
<artifactId>malhar-kafka</artifactId>
- <version>${malhar.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- </exclusion>
- </exclusions>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.apex</groupId>
- <artifactId>malhar-library</artifactId>
- <version>${malhar.version}</version>
- <!--
- If you know that your application does not need transitive dependencies pulled in by malhar-library,
- uncomment the following to reduce the size of your app package.
- -->
+ <artifactId>apex-engine</artifactId>
+ <version>${apex.core.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>info.batey.kafka</groupId>
+ <artifactId>kafka-unit</artifactId>
+ <version>0.4</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
@@ -261,37 +42,10 @@
</exclusion>
</exclusions>
</dependency>
-
<dependency>
<groupId>org.apache.apex</groupId>
- <artifactId>apex-common</artifactId>
- <version>${apex.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.10</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.apex</groupId>
- <artifactId>apex-engine</artifactId>
- <version>${apex.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.11</artifactId>
- <version>0.9.0.1</version>
- </dependency>
-
- <dependency>
- <groupId>info.batey.kafka</groupId>
- <artifactId>kafka-unit</artifactId>
- <version>0.4</version>
+ <artifactId>malhar-contrib</artifactId>
+ <version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
@@ -299,9 +53,12 @@
</exclusion>
</exclusions>
</dependency>
-
-
-
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.11</artifactId>
+ <version>0.9.0.0</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/src/main/java/com/example/myapexapp/KafkaApp.java
----------------------------------------------------------------------
diff --git a/examples/kafka/src/main/java/com/example/myapexapp/KafkaApp.java b/examples/kafka/src/main/java/com/example/myapexapp/KafkaApp.java
deleted file mode 100644
index 09089eb..0000000
--- a/examples/kafka/src/main/java/com/example/myapexapp/KafkaApp.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package com.example.myapexapp;
-
-import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
-import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-
-@ApplicationAnnotation(name="Kafka2HDFS")
-public class KafkaApp implements StreamingApplication
-{
-
- @Override
- public void populateDAG(DAG dag, Configuration conf)
- {
- KafkaSinglePortInputOperator in
- = dag.addOperator("kafkaIn", new KafkaSinglePortInputOperator());
-
- in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
- LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator());
-
- dag.addStream("data", in.outputPort, out.input);
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/src/main/java/com/example/myapexapp/LineOutputOperator.java
----------------------------------------------------------------------
diff --git a/examples/kafka/src/main/java/com/example/myapexapp/LineOutputOperator.java b/examples/kafka/src/main/java/com/example/myapexapp/LineOutputOperator.java
deleted file mode 100644
index 2b184c6..0000000
--- a/examples/kafka/src/main/java/com/example/myapexapp/LineOutputOperator.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package com.example.myapexapp;
-
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-
-import javax.validation.constraints.NotNull;
-
-import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
-
-/**
- * Converts each tuple to a string and writes it as a new line to the output file
- */
-public class LineOutputOperator extends AbstractFileOutputOperator<byte[]>
-{
- private static final String NL = System.lineSeparator();
- private static final Charset CS = StandardCharsets.UTF_8;
-
- @NotNull
- private String baseName;
-
- @Override
- public byte[] getBytesForTuple(byte[] t) {
- String result = new String(t, CS) + NL;
- return result.getBytes(CS);
- }
-
- @Override
- protected String getFileName(byte[] tuple) {
- return baseName;
- }
-
- public String getBaseName() { return baseName; }
- public void setBaseName(String v) { baseName = v; }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/src/main/java/org/apache/apex/examples/kafka/hdfs2kafka/Application.java
----------------------------------------------------------------------
diff --git a/examples/kafka/src/main/java/org/apache/apex/examples/kafka/hdfs2kafka/Application.java b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/hdfs2kafka/Application.java
new file mode 100644
index 0000000..646c8e8
--- /dev/null
+++ b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/hdfs2kafka/Application.java
@@ -0,0 +1,26 @@
+package org.apache.apex.examples.kafka.hdfs2kafka;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.contrib.kafka.KafkaSinglePortOutputOperator;
+import org.apache.apex.malhar.lib.fs.LineByLineFileInputOperator;
+
+@ApplicationAnnotation(name="Hdfs2Kafka")
+public class Application implements StreamingApplication
+{
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ LineByLineFileInputOperator in = dag.addOperator("lines",
+ LineByLineFileInputOperator.class);
+
+ KafkaSinglePortOutputOperator<String,String> out = dag.addOperator("kafkaOutput", new KafkaSinglePortOutputOperator<String,String>());
+
+ dag.addStream("data", in.output, out.inputPort).setLocality(Locality.CONTAINER_LOCAL);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/KafkaApp.java
----------------------------------------------------------------------
diff --git a/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/KafkaApp.java b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/KafkaApp.java
new file mode 100644
index 0000000..15f0182
--- /dev/null
+++ b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/KafkaApp.java
@@ -0,0 +1,26 @@
+package org.apache.apex.examples.kafka.kafka2hdfs;
+
+import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
+import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+@ApplicationAnnotation(name="Kafka2HDFS")
+public class KafkaApp implements StreamingApplication
+{
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ KafkaSinglePortInputOperator in
+ = dag.addOperator("kafkaIn", new KafkaSinglePortInputOperator());
+
+ in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
+ LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator());
+
+ dag.addStream("data", in.outputPort, out.input);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/LineOutputOperator.java
----------------------------------------------------------------------
diff --git a/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/LineOutputOperator.java b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/LineOutputOperator.java
new file mode 100644
index 0000000..ef40a69
--- /dev/null
+++ b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/LineOutputOperator.java
@@ -0,0 +1,34 @@
+package org.apache.apex.examples.kafka.kafka2hdfs;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
+
+/**
+ * Converts each tuple to a string and writes it as a new line to the output file
+ */
+public class LineOutputOperator extends AbstractFileOutputOperator<byte[]>
+{
+ private static final String NL = System.lineSeparator();
+ private static final Charset CS = StandardCharsets.UTF_8;
+
+ @NotNull
+ private String baseName;
+
+ @Override
+ public byte[] getBytesForTuple(byte[] t) {
+ String result = new String(t, CS) + NL;
+ return result.getBytes(CS);
+ }
+
+ @Override
+ protected String getFileName(byte[] tuple) {
+ return baseName;
+ }
+
+ public String getBaseName() { return baseName; }
+ public void setBaseName(String v) { baseName = v; }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/src/main/resources/META-INF/properties-hdfs2kafka.xml
----------------------------------------------------------------------
diff --git a/examples/kafka/src/main/resources/META-INF/properties-hdfs2kafka.xml b/examples/kafka/src/main/resources/META-INF/properties-hdfs2kafka.xml
new file mode 100644
index 0000000..7c624ca
--- /dev/null
+++ b/examples/kafka/src/main/resources/META-INF/properties-hdfs2kafka.xml
@@ -0,0 +1,16 @@
+<?xml version="1.0"?>
+<configuration>
+ <property>
+ <name>dt.operator.kafkaOutput.prop.topic</name>
+ <value>hdfs2kafka</value>
+ </property>
+ <property>
+ <name>dt.operator.lines.prop.directory</name>
+ <value>/tmp/hdfs2kafka</value>
+ </property>
+ <property>
+ <name>dt.operator.kafkaOutput.prop.producerProperties</name>
+ <value>serializer.class=kafka.serializer.StringEncoder,producer.type=async,metadata.broker.list=localhost:9092</value>
+ </property>
+</configuration>
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/src/test/java/com/example/myapexapp/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/kafka/src/test/java/com/example/myapexapp/ApplicationTest.java b/examples/kafka/src/test/java/com/example/myapexapp/ApplicationTest.java
deleted file mode 100644
index 635d25a..0000000
--- a/examples/kafka/src/test/java/com/example/myapexapp/ApplicationTest.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Put your copyright and license info here.
- */
-package com.example.myapexapp;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-
-import java.util.ArrayList;
-
-import javax.validation.ConstraintViolationException;
-
-import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
-import org.apache.hadoop.conf.Configuration;
-
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import info.batey.kafka.unit.KafkaUnitRule;
-import info.batey.kafka.unit.KafkaUnit;
-
-import kafka.producer.KeyedMessage;
-
-import com.datatorrent.api.LocalMode;
-
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test the DAG declaration in local mode.
- */
-public class ApplicationTest {
- private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
- private static final String TOPIC = "kafka2hdfs";
-
- private static final int zkPort = 2181;
- private static final int brokerPort = 9092;
- private static final String BROKER = "localhost:" + brokerPort;
- private static final String FILE_NAME = "test";
- private static final String FILE_DIR = "/tmp/FromKafka";
- private static final String FILE_PATH = FILE_DIR + "/" + FILE_NAME + ".0"; // first part
-
- // test messages
- private static String[] lines =
- {
- "1st line",
- "2nd line",
- "3rd line",
- "4th line",
- "5th line",
- };
-
- // broker port must match properties.xml
- @Rule
- public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort);
-
- @Test
- public void testApplication() throws Exception {
- try {
- // delete output file if it exists
- File file = new File(FILE_PATH);
- file.delete();
-
- // write messages to Kafka topic
- writeToTopic();
-
- // run app asynchronously; terminate after results are checked
- LocalMode.Controller lc = asyncRun();
-
- // check for presence of output file
- chkOutput();
-
- // compare output lines to input
- compare();
-
- lc.shutdown();
- } catch (ConstraintViolationException e) {
- Assert.fail("constraint violations: " + e.getConstraintViolations());
- }
- }
-
- private void writeToTopic() {
- KafkaUnit ku = kafkaUnitRule.getKafkaUnit();
- ku.createTopic(TOPIC);
- for (String line : lines) {
- KeyedMessage<String, String> kMsg = new KeyedMessage<>(TOPIC, line);
- ku.sendMessages(kMsg);
- }
- LOG.debug("Sent messages to topic {}", TOPIC);
- }
-
- private Configuration getConfig() {
- Configuration conf = new Configuration(false);
- String pre = "dt.operator.kafkaIn.prop.";
- conf.setEnum(pre + "initialOffset",
- AbstractKafkaInputOperator.InitialOffset.EARLIEST);
- conf.setInt(pre + "initialPartitionCount", 1);
- conf.set( pre + "topics", TOPIC);
- conf.set( pre + "clusters", BROKER);
-
- pre = "dt.operator.fileOut.prop.";
- conf.set( pre + "filePath", FILE_DIR);
- conf.set( pre + "baseName", FILE_NAME);
- conf.setInt(pre + "maxLength", 40);
- conf.setInt(pre + "rotationWindows", 3);
-
- return conf;
- }
-
- private LocalMode.Controller asyncRun() throws Exception {
- Configuration conf = getConfig();
- LocalMode lma = LocalMode.newInstance();
- lma.prepareDAG(new KafkaApp(), conf);
- LocalMode.Controller lc = lma.getController();
- lc.runAsync();
- return lc;
- }
-
- private static void chkOutput() throws Exception {
- File file = new File(FILE_PATH);
- final int MAX = 60;
- for (int i = 0; i < MAX && (! file.exists()); ++i ) {
- LOG.debug("Sleeping, i = {}", i);
- Thread.sleep(1000);
- }
- if (! file.exists()) {
- String msg = String.format("Error: %s not found after %d seconds%n", FILE_PATH, MAX);
- throw new RuntimeException(msg);
- }
- }
-
- private static void compare() throws Exception {
- // read output file
- File file = new File(FILE_PATH);
- BufferedReader br = new BufferedReader(new FileReader(file));
- ArrayList<String> list = new ArrayList<>();
- String line;
- while (null != (line = br.readLine())) {
- list.add(line);
- }
- br.close();
-
- // compare
- Assert.assertEquals("number of lines", list.size(), lines.length);
- for (int i = 0; i < lines.length; ++i) {
- assertTrue("line", lines[i].equals(list.get(i)));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/src/test/java/org/apache/apex/examples/kafka/hdfs2kafka/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/kafka/src/test/java/org/apache/apex/examples/kafka/hdfs2kafka/ApplicationTest.java b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/hdfs2kafka/ApplicationTest.java
new file mode 100644
index 0000000..aa63ee5
--- /dev/null
+++ b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/hdfs2kafka/ApplicationTest.java
@@ -0,0 +1,125 @@
+package org.apache.apex.examples.kafka.hdfs2kafka;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+import info.batey.kafka.unit.KafkaUnit;
+import info.batey.kafka.unit.KafkaUnitRule;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test the DAG declaration in local mode.
+ */
+public class ApplicationTest {
+ private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
+ private static final String TOPIC = "hdfs2kafka";
+ private static final String directory = "target/hdfs2kafka";
+ private static final String FILE_NAME = "messages.txt";
+
+ private static final int zkPort = 2181;
+ private static final int brokerPort = 9092;
+ private static final String BROKER = "localhost:" + brokerPort;
+ //private static final String FILE_PATH = FILE_DIR + "/" + FILE_NAME + ".0"; // first part
+
+ // test messages
+ private static String[] lines =
+ {
+ "1st line",
+ "2nd line",
+ "3rd line",
+ "4th line",
+ "5th line",
+ };
+
+ // broker port must match properties.xml
+ @Rule
+ public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort);
+
+
+ @Test
+ public void testApplication() throws IOException, Exception {
+ try {
+ // create file in monitored HDFS directory
+ createFile();
+
+ // run app asynchronously; terminate after results are checked
+ LocalMode.Controller lc = asyncRun();
+
+ // get messages from Kafka topic and compare with input
+ chkOutput();
+
+ lc.shutdown();
+ } catch (ConstraintViolationException e) {
+ Assert.fail("constraint violations: " + e.getConstraintViolations());
+ }
+ }
+
+ // create a file with content from 'lines'
+ private void createFile() throws IOException {
+ // remove old file and create new one
+ File file = new File(directory, FILE_NAME);
+ FileUtils.deleteQuietly(file);
+ try {
+ String data = StringUtils.join(lines, "\n") + "\n"; // add final newline
+ FileUtils.writeStringToFile(file, data, "UTF-8");
+ } catch (IOException e) {
+ LOG.error("Error: Failed to create file {} in {}", FILE_NAME, directory);
+ e.printStackTrace();
+ }
+ LOG.debug("Created file {} with {} lines in {}",
+ FILE_NAME, lines.length, directory);
+ }
+
+ private LocalMode.Controller asyncRun() throws Exception {
+ Configuration conf = getConfig();
+ LocalMode lma = LocalMode.newInstance();
+ lma.prepareDAG(new Application(), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.runAsync();
+ return lc;
+ }
+
+ private Configuration getConfig() {
+ Configuration conf = new Configuration(false);
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-hdfs2kafka.xml"));
+ conf.set("dt.operator.lines.prop.directory", directory);
+ return conf;
+ }
+
+ private void chkOutput() throws Exception {
+ KafkaUnit ku = kafkaUnitRule.getKafkaUnit();
+ List<String> messages = null;
+
+ // wait for messages to appear in kafka
+ Thread.sleep(10000);
+
+ try {
+ messages = ku.readMessages(TOPIC, lines.length);
+ } catch (Exception e) {
+ LOG.error("Error: Got exception {}", e);
+ }
+
+ int i = 0;
+ for (String msg : messages) {
+ assertTrue("Error: message mismatch", msg.equals(lines[i]));
+ ++i;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/kafka/src/test/java/org/apache/apex/examples/kafka/kafka2hdfs/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/kafka/src/test/java/org/apache/apex/examples/kafka/kafka2hdfs/ApplicationTest.java b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/kafka2hdfs/ApplicationTest.java
new file mode 100644
index 0000000..80d84fa
--- /dev/null
+++ b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/kafka2hdfs/ApplicationTest.java
@@ -0,0 +1,150 @@
+/**
+ * Put your copyright and license info here.
+ */
+package org.apache.apex.examples.kafka.kafka2hdfs;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.util.ArrayList;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+import info.batey.kafka.unit.KafkaUnit;
+import info.batey.kafka.unit.KafkaUnitRule;
+import kafka.producer.KeyedMessage;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test the DAG declaration in local mode.
+ */
+public class ApplicationTest {
+ private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
+ private static final String TOPIC = "kafka2hdfs";
+
+ private static final int zkPort = 2181;
+ private static final int brokerPort = 9092;
+ private static final String BROKER = "localhost:" + brokerPort;
+ private static final String FILE_NAME = "test";
+ private static final String FILE_DIR = "/tmp/FromKafka";
+ private static final String FILE_PATH = FILE_DIR + "/" + FILE_NAME + ".0"; // first part
+
+ // test messages
+ private static String[] lines =
+ {
+ "1st line",
+ "2nd line",
+ "3rd line",
+ "4th line",
+ "5th line",
+ };
+
+ // broker port must match properties.xml
+ @Rule
+ public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort);
+
+ @Test
+ public void testApplication() throws Exception {
+ try {
+ // delete output file if it exists
+ File file = new File(FILE_PATH);
+ file.delete();
+
+ // write messages to Kafka topic
+ writeToTopic();
+
+ // run app asynchronously; terminate after results are checked
+ LocalMode.Controller lc = asyncRun();
+
+ // check for presence of output file
+ chkOutput();
+
+ // compare output lines to input
+ compare();
+
+ lc.shutdown();
+ } catch (ConstraintViolationException e) {
+ Assert.fail("constraint violations: " + e.getConstraintViolations());
+ }
+ }
+
+ private void writeToTopic() {
+ KafkaUnit ku = kafkaUnitRule.getKafkaUnit();
+ ku.createTopic(TOPIC);
+ for (String line : lines) {
+ KeyedMessage<String, String> kMsg = new KeyedMessage<>(TOPIC, line);
+ ku.sendMessages(kMsg);
+ }
+ LOG.debug("Sent messages to topic {}", TOPIC);
+ }
+
+ private Configuration getConfig() {
+ Configuration conf = new Configuration(false);
+ String pre = "dt.operator.kafkaIn.prop.";
+ conf.setEnum(pre + "initialOffset",
+ AbstractKafkaInputOperator.InitialOffset.EARLIEST);
+ conf.setInt(pre + "initialPartitionCount", 1);
+ conf.set( pre + "topics", TOPIC);
+ conf.set( pre + "clusters", BROKER);
+
+ pre = "dt.operator.fileOut.prop.";
+ conf.set( pre + "filePath", FILE_DIR);
+ conf.set( pre + "baseName", FILE_NAME);
+ conf.setInt(pre + "maxLength", 40);
+ conf.setInt(pre + "rotationWindows", 3);
+
+ return conf;
+ }
+
+ private LocalMode.Controller asyncRun() throws Exception {
+ Configuration conf = getConfig();
+ LocalMode lma = LocalMode.newInstance();
+ lma.prepareDAG(new KafkaApp(), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.runAsync();
+ return lc;
+ }
+
+ private static void chkOutput() throws Exception {
+ File file = new File(FILE_PATH);
+ final int MAX = 60;
+ for (int i = 0; i < MAX && (! file.exists()); ++i ) {
+ LOG.debug("Sleeping, i = {}", i);
+ Thread.sleep(1000);
+ }
+ if (! file.exists()) {
+ String msg = String.format("Error: %s not found after %d seconds%n", FILE_PATH, MAX);
+ throw new RuntimeException(msg);
+ }
+ }
+
+ private static void compare() throws Exception {
+ // read output file
+ File file = new File(FILE_PATH);
+ BufferedReader br = new BufferedReader(new FileReader(file));
+ ArrayList<String> list = new ArrayList<>();
+ String line;
+ while (null != (line = br.readLine())) {
+ list.add(line);
+ }
+ br.close();
+
+ // compare
+ Assert.assertEquals("number of lines", list.size(), lines.length);
+ for (int i = 0; i < lines.length; ++i) {
+ assertTrue("line", lines[i].equals(list.get(i)));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 8412c8f..16cfe26 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -197,6 +197,7 @@
<module>recordReader</module>
<module>throttle</module>
<module>transform</module>
+ <module>kafka</module>
</modules>
<dependencies>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c79def4c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9bccc18..5b531ea 100644
--- a/pom.xml
+++ b/pom.xml
@@ -196,7 +196,6 @@
<profile>
<id>all-modules</id>
<modules>
- <module>kafka</module>
<module>hive</module>
<module>stream</module>
<module>benchmark</module>
@@ -210,6 +209,7 @@
<modules>
<module>library</module>
<module>contrib</module>
+ <module>kafka</module>
<module>examples</module>
</modules>