You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/27 15:19:18 UTC
[17/19] apex-malhar git commit: APEXMALHAR-2233 Updated the examples
to follow the structure of apex-malhar examples. Specified dependencies in
pom.xmls of individual examples correctly.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dynamic-partition/src/main/java/org/apache/apex/examples/dynamic/App.java
----------------------------------------------------------------------
diff --git a/examples/dynamic-partition/src/main/java/org/apache/apex/examples/dynamic/App.java b/examples/dynamic-partition/src/main/java/org/apache/apex/examples/dynamic/App.java
new file mode 100644
index 0000000..1a40cb5
--- /dev/null
+++ b/examples/dynamic-partition/src/main/java/org/apache/apex/examples/dynamic/App.java
@@ -0,0 +1,23 @@
+package org.apache.apex.examples.dynamic;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+import com.datatorrent.lib.stream.DevNull;
+
+@ApplicationAnnotation(name="Dyn")
+public class App implements StreamingApplication
+{
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ Gen gen = dag.addOperator("gen", Gen.class);
+ DevNull devNull = dag.addOperator("devNull", DevNull.class);
+
+ dag.addStream("data", gen.out, devNull.data);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dynamic-partition/src/main/java/org/apache/apex/examples/dynamic/Gen.java
----------------------------------------------------------------------
diff --git a/examples/dynamic-partition/src/main/java/org/apache/apex/examples/dynamic/Gen.java b/examples/dynamic-partition/src/main/java/org/apache/apex/examples/dynamic/Gen.java
new file mode 100644
index 0000000..29f79ac
--- /dev/null
+++ b/examples/dynamic-partition/src/main/java/org/apache/apex/examples/dynamic/Gen.java
@@ -0,0 +1,171 @@
+package org.apache.apex.examples.dynamic;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.IOUtils;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * Operator that dynamically partitions itself after 500 tuples have been emitted
+ */
+public class Gen extends BaseOperator implements InputOperator, Partitioner<Gen>, StatsListener
+{
+ private static final Logger LOG = LoggerFactory.getLogger(Gen.class);
+
+ private static final int MAX_PARTITIONS = 4; // maximum number of partitions
+
+ private int partitions = 2; // initial number of partitions
+
+ @NotNull
+ private int numTuples; // number of tuples to emit per window
+
+ private transient int count = 0;
+
+ public final transient DefaultOutputPort<Double> out = new DefaultOutputPort<Double>();
+
+ @Override
+ public void partitioned(Map<Integer, Partition<Gen>> map)
+ {
+ if (partitions != map.size()) {
+ String msg = String.format("partitions = %d, map.size = %d%n", partitions, map.size());
+ throw new RuntimeException(msg);
+ }
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ count = 0;
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ if (count < numTuples) {
+ ++count;
+ out.emit(Math.random());
+ }
+ }
+
+ public int getNumTuples()
+ {
+ return numTuples;
+ }
+
+ /**
+ * Sets the number of tuples to be emitted every window.
+ * @param numTuples number of tuples
+ */
+ public void setNumTuples(int numTuples)
+ {
+ this.numTuples = numTuples;
+ }
+
+ @Override
+ public Response processStats(BatchedOperatorStats batchedOperatorStats) {
+
+ final long emittedCount = batchedOperatorStats.getTuplesEmittedPSMA();
+
+ // we only perform a single dynamic repartition
+ Response res = new Response();
+ res.repartitionRequired = false;
+ if (emittedCount > 500 && partitions < MAX_PARTITIONS) {
+ LOG.info("processStats: trying repartition of input operator current {} required {}",
+ partitions, MAX_PARTITIONS);
+ LOG.info("**** operator id = {}, window id = {}, tuplesProcessedPSMA = {}, tuplesEmittedPSMA = {}",
+ batchedOperatorStats.getOperatorId(),
+ batchedOperatorStats.getCurrentWindowId(),
+ batchedOperatorStats.getTuplesProcessedPSMA(),
+ emittedCount);
+ partitions = MAX_PARTITIONS;
+ res.repartitionRequired = true;
+ }
+
+ return res;
+ } // processStats
+
+ /**
+ * Clone object by serializing and deserializing using Kryo.
+ * Note this is different from using {@link Kryo#copy(Object)}, which will attempt to also clone transient fields.
+ *
+ * @param kryo kryo object used to clone objects
+ * @param src src object that copy from
+ * @return cloned object
+ */
+ @SuppressWarnings("unchecked")
+ private static <SRC> SRC cloneObject(Kryo kryo, SRC src)
+ {
+ kryo.setClassLoader(src.getClass().getClassLoader());
+ ByteArrayOutputStream bos = null;
+ Output output;
+ Input input = null;
+ try {
+ bos = new ByteArrayOutputStream();
+ output = new Output(bos);
+ kryo.writeObject(output, src);
+ output.close();
+ input = new Input(bos.toByteArray());
+ return (SRC)kryo.readObject(input, src.getClass());
+ } finally {
+ IOUtils.closeQuietly(input);
+ IOUtils.closeQuietly(bos);
+ }
+ }
+
+ @Override
+ public Collection<Partition<Gen>> definePartitions(
+ Collection<Partition<Gen>> list, PartitioningContext context)
+ {
+ if (partitions < 0) { // error
+ String msg = String.format("Error: Bad value: partitions = %d%n", partitions);
+ LOG.error(msg);
+ throw new RuntimeException(msg);
+ }
+
+ final int prevCount = list.size();
+ if (1 == prevCount) { // initial call
+ LOG.info("definePartitions: First call, prevCount = {}, partitions = {}",
+ prevCount, partitions);
+ }
+
+ if (prevCount == partitions) {
+ LOG.info("definePartitions: Nothing to do in definePartitions");
+ return list; // nothing to do
+ }
+
+ LOG.debug("definePartitions: Repartitioning from {} to {}", prevCount, partitions);
+
+ Kryo kryo = new Kryo();
+
+ // return value: new list of partitions (includes old list)
+ List<Partition<Gen>> newPartitions = Lists.newArrayListWithExpectedSize(partitions);
+
+ for (int i = 0; i < partitions; i++) {
+ Gen oper = cloneObject(kryo, this);
+ newPartitions.add(new DefaultPartition<>(oper));
+ }
+
+ LOG.info("definePartition: returning {} partitions", newPartitions.size());
+ return newPartitions;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dynamic-partition/src/test/java/com/example/dynamic/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/dynamic-partition/src/test/java/com/example/dynamic/ApplicationTest.java b/examples/dynamic-partition/src/test/java/com/example/dynamic/ApplicationTest.java
deleted file mode 100644
index 788b9d3..0000000
--- a/examples/dynamic-partition/src/test/java/com/example/dynamic/ApplicationTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package com.example.dynamic;
-
-import java.io.IOException;
-
-import javax.validation.ConstraintViolationException;
-
-import org.junit.Assert;
-
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Test;
-
-import com.datatorrent.api.LocalMode;
-//import com.example.myapexapp.Application;
-
-/**
- * Test the DAG declaration in local mode.
- */
-public class ApplicationTest {
-
- @Test
- public void testApplication() throws IOException, Exception {
- try {
- LocalMode lma = LocalMode.newInstance();
- Configuration conf = new Configuration(false);
- conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
- lma.prepareDAG(new App(), conf);
- LocalMode.Controller lc = lma.getController();
- lc.run(10000); // runs for 10 seconds and quits
- } catch (ConstraintViolationException e) {
- Assert.fail("constraint violations: " + e.getConstraintViolations());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dynamic-partition/src/test/java/org/apache/apex/examples/dynamic/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/dynamic-partition/src/test/java/org/apache/apex/examples/dynamic/ApplicationTest.java b/examples/dynamic-partition/src/test/java/org/apache/apex/examples/dynamic/ApplicationTest.java
new file mode 100644
index 0000000..a4c8076
--- /dev/null
+++ b/examples/dynamic-partition/src/test/java/org/apache/apex/examples/dynamic/ApplicationTest.java
@@ -0,0 +1,33 @@
+package org.apache.apex.examples.dynamic;
+
+import java.io.IOException;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+/**
+ * Test the DAG declaration in local mode.
+ */
+public class ApplicationTest {
+
+ @Test
+ public void testApplication() throws IOException, Exception {
+ try {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+ lma.prepareDAG(new App(), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.run(10000); // runs for 10 seconds and quits
+ } catch (ConstraintViolationException e) {
+ Assert.fail("constraint violations: " + e.getConstraintViolations());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/enricher/pom.xml
----------------------------------------------------------------------
diff --git a/examples/enricher/pom.xml b/examples/enricher/pom.xml
index a93bcf5..7a55d32 100644
--- a/examples/enricher/pom.xml
+++ b/examples/enricher/pom.xml
@@ -1,263 +1,25 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
-
- <groupId>com.example</groupId>
- <version>1.0-SNAPSHOT</version>
- <artifactId>enricher</artifactId>
+
+ <parent>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-examples</artifactId>
+ <version>3.7.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>malhar-examples-enricher</artifactId>
<packaging>jar</packaging>
<!-- change these to the appropriate values -->
<name>Enricher</name>
<description>Example Use of POJO Enricher</description>
- <properties>
- <!-- change this if you desire to use a different version of Apex Core -->
- <apex.version>3.5.0</apex.version>
- <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
- <malhar.version>3.6.0</malhar.version>
- </properties>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-eclipse-plugin</artifactId>
- <version>2.9</version>
- <configuration>
- <downloadSources>true</downloadSources>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.3</version>
- <configuration>
- <encoding>UTF-8</encoding>
- <source>1.7</source>
- <target>1.7</target>
- <debug>true</debug>
- <optimize>false</optimize>
- <showDeprecation>true</showDeprecation>
- <showWarnings>true</showWarnings>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-dependency-plugin</artifactId>
- <version>2.8</version>
- <executions>
- <execution>
- <id>copy-dependencies</id>
- <phase>prepare-package</phase>
- <goals>
- <goal>copy-dependencies</goal>
- </goals>
- <configuration>
- <outputDirectory>target/deps</outputDirectory>
- <includeScope>runtime</includeScope>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <executions>
- <execution>
- <id>app-package-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- <configuration>
- <finalName>${project.artifactId}-${project.version}-apexapp</finalName>
- <appendAssemblyId>false</appendAssemblyId>
- <descriptors>
- <descriptor>src/assemble/appPackage.xml</descriptor>
- </descriptors>
- <archiverConfig>
- <defaultDirectoryMode>0755</defaultDirectoryMode>
- </archiverConfig>
- <archive>
- <manifestEntries>
- <Class-Path>${apex.apppackage.classpath}</Class-Path>
- <DT-Engine-Version>${apex.version}</DT-Engine-Version>
- <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id>
- <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name>
- <DT-App-Package-Version>${project.version}</DT-App-Package-Version>
- <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name>
- <DT-App-Package-Description>${project.description}</DT-App-Package-Description>
- </manifestEntries>
- </archive>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <artifactId>maven-antrun-plugin</artifactId>
- <version>1.7</version>
- <executions>
- <execution>
- <phase>package</phase>
- <configuration>
- <target>
- <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar"
- tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" />
- </target>
- </configuration>
- <goals>
- <goal>run</goal>
- </goals>
- </execution>
- <execution>
- <!-- create resource directory for xml javadoc-->
- <id>createJavadocDirectory</id>
- <phase>generate-resources</phase>
- <configuration>
- <tasks>
- <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/>
- <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/>
- </tasks>
- </configuration>
- <goals>
- <goal>run</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.9.1</version>
- <executions>
- <execution>
- <id>attach-artifacts</id>
- <phase>package</phase>
- <goals>
- <goal>attach-artifact</goal>
- </goals>
- <configuration>
- <artifacts>
- <artifact>
- <file>target/${project.artifactId}-${project.version}.apa</file>
- <type>apa</type>
- </artifact>
- </artifacts>
- <skipAttach>false</skipAttach>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <!-- generate javdoc -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <executions>
- <!-- generate xml javadoc -->
- <execution>
- <id>xml-doclet</id>
- <phase>generate-resources</phase>
- <goals>
- <goal>javadoc</goal>
- </goals>
- <configuration>
- <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet>
- <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam>
- <useStandardDocletOptions>false</useStandardDocletOptions>
- <docletArtifact>
- <groupId>com.github.markusbernhardt</groupId>
- <artifactId>xml-doclet</artifactId>
- <version>1.0.4</version>
- </docletArtifact>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags-->
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>xml-maven-plugin</artifactId>
- <version>1.0</version>
- <executions>
- <execution>
- <id>transform-xmljavadoc</id>
- <phase>generate-resources</phase>
- <goals>
- <goal>transform</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <transformationSets>
- <transformationSet>
- <dir>${project.build.directory}/generated-resources/xml-javadoc</dir>
- <includes>
- <include>${project.artifactId}-${project.version}-javadoc.xml</include>
- </includes>
- <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet>
- <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir>
- </transformationSet>
- </transformationSets>
- </configuration>
- </plugin>
- <!-- copy xml javadoc to class jar -->
- <plugin>
- <artifactId>maven-resources-plugin</artifactId>
- <version>2.6</version>
- <executions>
- <execution>
- <id>copy-resources</id>
- <phase>process-resources</phase>
- <goals>
- <goal>copy-resources</goal>
- </goals>
- <configuration>
- <outputDirectory>${basedir}/target/classes</outputDirectory>
- <resources>
- <resource>
- <directory>${project.build.directory}/generated-resources/xml-javadoc</directory>
- <includes>
- <include>${project.artifactId}-${project.version}-javadoc.xml</include>
- </includes>
- <filtering>true</filtering>
- </resource>
- </resources>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- </plugins>
-
- </build>
-
<dependencies>
- <!-- add your dependencies here -->
- <dependency>
- <groupId>org.apache.apex</groupId>
- <artifactId>malhar-library</artifactId>
- <version>${malhar.version}</version>
- <!--
- If you know that your application does not need transitive dependencies pulled in by malhar-library,
- uncomment the following to reduce the size of your app package.
- -->
- <exclusions>
- <exclusion>
- <groupId>*</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
<dependency>
<groupId>org.apache.apex</groupId>
<artifactId>malhar-contrib</artifactId>
- <version>${malhar.version}</version>
- <!--
- If you know that your application does not need transitive dependencies pulled in by malhar-library,
- uncomment the following to reduce the size of your app package.
- -->
+ <version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
@@ -265,27 +27,6 @@
</exclusion>
</exclusions>
</dependency>
-
- <dependency>
- <groupId>org.apache.apex</groupId>
- <artifactId>apex-common</artifactId>
- <version>${apex.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.10</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.apex</groupId>
- <artifactId>apex-engine</artifactId>
- <version>${apex.version}</version>
- <scope>test</scope>
- </dependency>
-
<dependency>
<groupId>com.github.fge</groupId>
<artifactId>json-schema-validator</artifactId>
@@ -303,6 +44,25 @@
<artifactId>janino</artifactId>
<version>2.7.8</version>
</dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <version>1.9.13</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>0.13.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jettison</groupId>
+ <artifactId>jettison</artifactId>
+ <version>1.1</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.7.0</version>
+ </dependency>
</dependencies>
-
</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/enricher/src/main/java/com/example/myapexapp/DataGenerator.java
----------------------------------------------------------------------
diff --git a/examples/enricher/src/main/java/com/example/myapexapp/DataGenerator.java b/examples/enricher/src/main/java/com/example/myapexapp/DataGenerator.java
deleted file mode 100644
index 3afbb87..0000000
--- a/examples/enricher/src/main/java/com/example/myapexapp/DataGenerator.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package com.example.myapexapp;
-
-import java.util.Random;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.common.util.BaseOperator;
-
-/**
- * Generates Subscriber Data:
- * A Party Phone
- * A Party IMEI
- * A Party IMSI
- * Circle Id
- */
-public class DataGenerator extends BaseOperator implements InputOperator
-{
- public static int NUM_CIRCLES = 10;
-
- private Random r;
- private int count = 0;
- private int limit = 1000;
-
- public final transient DefaultOutputPort<byte[]> output = new DefaultOutputPort<>();
-
- @Override
- public void setup(OperatorContext context)
- {
- r = new Random(System.currentTimeMillis());
- }
-
- @Override
- public void beginWindow(long windowId) {
- super.beginWindow(windowId);
- count = 0;
- }
-
- @Override
- public void emitTuples()
- {
- if(count++ < limit) {
- output.emit(getRecord());
- }
- }
-
- private byte[] getRecord()
- {
- String phone = getRandomNumber(10);
- String imsi = getHashInRange(phone, 15);
- String imei = getHashInRange(imsi, 15);
- String circleId = Math.abs(phone.hashCode()) % NUM_CIRCLES + "";
-// String record = MessageFormat.format(baseDataTemplate, phone, imsi, imei, circleId);
- String record = "{" +
- "\"phone\":\"" + phone + "\"," +
- "\"imei\":\"" + imei+ "\"," +
- "\"imsi\":\"" + imsi+ "\"," +
- "\"circleId\":" + circleId +
- "}";
- return record.getBytes();
- }
-
- private String getRandomNumber(int numDigits)
- {
- String retVal = (r.nextInt((9 - 1) + 1) + 1) + "";
-
- for (int i = 0; i < numDigits - 1; i++) {
- retVal += (r.nextInt((9 - 0) + 1) + 0);
- }
- return retVal;
- }
-
- private String getHashInRange(String s, int n)
- {
- StringBuilder retVal = new StringBuilder();
- for (int i = 0, j = 0; i < n && j < s.length(); i++, j++) {
- retVal.append(Math.abs(s.charAt(j) + "".hashCode()) % 10);
- if (j == s.length() - 1) {
- j = -1;
- }
- }
- return retVal.toString();
- }
-
- public int getLimit()
- {
- return limit;
- }
-
- public void setLimit(int limit)
- {
- this.limit = limit;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/enricher/src/main/java/com/example/myapexapp/EnricherAppWithJSONFile.java
----------------------------------------------------------------------
diff --git a/examples/enricher/src/main/java/com/example/myapexapp/EnricherAppWithJSONFile.java b/examples/enricher/src/main/java/com/example/myapexapp/EnricherAppWithJSONFile.java
deleted file mode 100644
index a0dab64..0000000
--- a/examples/enricher/src/main/java/com/example/myapexapp/EnricherAppWithJSONFile.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package com.example.myapexapp;
-
-import java.util.ArrayList;
-
-import com.datatorrent.contrib.enrich.JsonFSLoader;
-import com.datatorrent.contrib.enrich.POJOEnricher;
-import com.datatorrent.contrib.parser.JsonParser;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-
-@ApplicationAnnotation(name="EnricherAppWithJSONFile")
-public class EnricherAppWithJSONFile implements StreamingApplication
-{
- @Override
- public void populateDAG(DAG dag, Configuration conf)
- {
- DataGenerator dataGenerator = dag.addOperator("DataGenerator", DataGenerator.class);
- JsonParser parser = dag.addOperator("Parser", JsonParser.class);
-
- /**
- * FSLoader is used to configure Enricher backend. Property of FSLoader file which is fileName is set in
- * properties.xml file.
- * The format that is used to read the file is present as an example in resources/circleMapping.txt file.
- */
- JsonFSLoader fsLoader = new JsonFSLoader();
- POJOEnricher enrich = dag.addOperator("Enrich", POJOEnricher.class);
- enrich.setStore(fsLoader);
-
- ArrayList includeFields = new ArrayList();
- includeFields.add("circleName");
- ArrayList lookupFields = new ArrayList();
- lookupFields.add("circleId");
-
- enrich.setIncludeFields(includeFields);
- enrich.setLookupFields(lookupFields);
-
- ConsoleOutputOperator console = dag.addOperator("Console", ConsoleOutputOperator.class);
-
- dag.addStream("Parse", dataGenerator.output, parser.in);
- dag.addStream("Enrich", parser.out, enrich.input);
- dag.addStream("Console", enrich.output, console.input);
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/enricher/src/main/java/com/example/myapexapp/LineOutputOperator.java
----------------------------------------------------------------------
diff --git a/examples/enricher/src/main/java/com/example/myapexapp/LineOutputOperator.java b/examples/enricher/src/main/java/com/example/myapexapp/LineOutputOperator.java
deleted file mode 100644
index 3b7a298..0000000
--- a/examples/enricher/src/main/java/com/example/myapexapp/LineOutputOperator.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package com.example.myapexapp;
-
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-
-import javax.validation.constraints.NotNull;
-
-import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
-
-/**
- * Converts each tuple to a string and writes it as a new line to the output file
- */
-public class LineOutputOperator extends AbstractFileOutputOperator<Object>
-{
- private static final String NL = System.lineSeparator();
- private static final Charset CS = StandardCharsets.UTF_8;
-
- @NotNull
- private String baseName;
-
- @Override
- public byte[] getBytesForTuple(Object t) {
- String result = new String(t.toString().getBytes(), CS) + NL;
- return result.getBytes(CS);
- }
-
- @Override
- protected String getFileName(Object tuple) {
- return baseName;
- }
-
- public String getBaseName() { return baseName; }
- public void setBaseName(String v) { baseName = v; }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/enricher/src/main/java/com/example/myapexapp/POJO.java
----------------------------------------------------------------------
diff --git a/examples/enricher/src/main/java/com/example/myapexapp/POJO.java b/examples/enricher/src/main/java/com/example/myapexapp/POJO.java
deleted file mode 100644
index 32845e8..0000000
--- a/examples/enricher/src/main/java/com/example/myapexapp/POJO.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package com.example.myapexapp;
-
-public class POJO
-{
- private String phone;
- private String imei;
- private String imsi;
- private int circleId;
-
- public String getPhone()
- {
- return phone;
- }
-
- public void setPhone(String phone)
- {
- this.phone = phone;
- }
-
- public String getImei()
- {
- return imei;
- }
-
- public void setImei(String imei)
- {
- this.imei = imei;
- }
-
- public String getImsi()
- {
- return imsi;
- }
-
- public void setImsi(String imsi)
- {
- this.imsi = imsi;
- }
-
- public int getCircleId()
- {
- return circleId;
- }
-
- public void setCircleId(int circleId)
- {
- this.circleId = circleId;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/enricher/src/main/java/com/example/myapexapp/POJOEnriched.java
----------------------------------------------------------------------
diff --git a/examples/enricher/src/main/java/com/example/myapexapp/POJOEnriched.java b/examples/enricher/src/main/java/com/example/myapexapp/POJOEnriched.java
deleted file mode 100644
index bed2cfb..0000000
--- a/examples/enricher/src/main/java/com/example/myapexapp/POJOEnriched.java
+++ /dev/null
@@ -1,71 +0,0 @@
-package com.example.myapexapp;
-
-public class POJOEnriched
-{
- private String phone;
- private String imei;
- private String imsi;
- private int circleId;
- private String circleName;
-
- public String getPhone()
- {
- return phone;
- }
-
- public void setPhone(String phone)
- {
- this.phone = phone;
- }
-
- public String getImei()
- {
- return imei;
- }
-
- public void setImei(String imei)
- {
- this.imei = imei;
- }
-
- public String getImsi()
- {
- return imsi;
- }
-
- public void setImsi(String imsi)
- {
- this.imsi = imsi;
- }
-
- public int getCircleId()
- {
- return circleId;
- }
-
- public void setCircleId(int circleId)
- {
- this.circleId = circleId;
- }
-
- public String getCircleName()
- {
- return circleName;
- }
-
- public void setCircleName(String circleName)
- {
- this.circleName = circleName;
- }
-
- @Override public String toString()
- {
- return "POJOEnriched{" +
- "phone='" + phone + '\'' +
- ", imei='" + imei + '\'' +
- ", imsi='" + imsi + '\'' +
- ", circleId=" + circleId +
- ", circleName='" + circleName + '\'' +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/enricher/src/main/java/org/apache/apex/examples/enricher/DataGenerator.java
----------------------------------------------------------------------
diff --git a/examples/enricher/src/main/java/org/apache/apex/examples/enricher/DataGenerator.java b/examples/enricher/src/main/java/org/apache/apex/examples/enricher/DataGenerator.java
new file mode 100644
index 0000000..2ba5567
--- /dev/null
+++ b/examples/enricher/src/main/java/org/apache/apex/examples/enricher/DataGenerator.java
@@ -0,0 +1,94 @@
+package org.apache.apex.examples.enricher;
+
+import java.util.Random;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * Generates Subscriber Data:
+ * A Party Phone
+ * A Party IMEI
+ * A Party IMSI
+ * Circle Id
+ */
+public class DataGenerator extends BaseOperator implements InputOperator
+{
+ public static int NUM_CIRCLES = 10;
+
+ private Random r;
+ private int count = 0;
+ private int limit = 1000;
+
+ public final transient DefaultOutputPort<byte[]> output = new DefaultOutputPort<>();
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ r = new Random(System.currentTimeMillis());
+ }
+
+ @Override
+ public void beginWindow(long windowId) {
+ super.beginWindow(windowId);
+ count = 0;
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ if(count++ < limit) {
+ output.emit(getRecord());
+ }
+ }
+
+ private byte[] getRecord()
+ {
+ String phone = getRandomNumber(10);
+ String imsi = getHashInRange(phone, 15);
+ String imei = getHashInRange(imsi, 15);
+ String circleId = Math.abs(phone.hashCode()) % NUM_CIRCLES + "";
+// String record = MessageFormat.format(baseDataTemplate, phone, imsi, imei, circleId);
+ String record = "{" +
+ "\"phone\":\"" + phone + "\"," +
+ "\"imei\":\"" + imei+ "\"," +
+ "\"imsi\":\"" + imsi+ "\"," +
+ "\"circleId\":" + circleId +
+ "}";
+ return record.getBytes();
+ }
+
+ private String getRandomNumber(int numDigits)
+ {
+ String retVal = (r.nextInt((9 - 1) + 1) + 1) + "";
+
+ for (int i = 0; i < numDigits - 1; i++) {
+ retVal += (r.nextInt((9 - 0) + 1) + 0);
+ }
+ return retVal;
+ }
+
+ private String getHashInRange(String s, int n)
+ {
+ StringBuilder retVal = new StringBuilder();
+ for (int i = 0, j = 0; i < n && j < s.length(); i++, j++) {
+ retVal.append(Math.abs(s.charAt(j) + "".hashCode()) % 10);
+ if (j == s.length() - 1) {
+ j = -1;
+ }
+ }
+ return retVal.toString();
+ }
+
+ public int getLimit()
+ {
+ return limit;
+ }
+
+ public void setLimit(int limit)
+ {
+ this.limit = limit;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/enricher/src/main/java/org/apache/apex/examples/enricher/EnricherAppWithJSONFile.java
----------------------------------------------------------------------
diff --git a/examples/enricher/src/main/java/org/apache/apex/examples/enricher/EnricherAppWithJSONFile.java b/examples/enricher/src/main/java/org/apache/apex/examples/enricher/EnricherAppWithJSONFile.java
new file mode 100644
index 0000000..1a420c4
--- /dev/null
+++ b/examples/enricher/src/main/java/org/apache/apex/examples/enricher/EnricherAppWithJSONFile.java
@@ -0,0 +1,47 @@
+package org.apache.apex.examples.enricher;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.enrich.JsonFSLoader;
+import com.datatorrent.contrib.enrich.POJOEnricher;
+import com.datatorrent.contrib.parser.JsonParser;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+
+@ApplicationAnnotation(name="EnricherAppWithJSONFile")
+public class EnricherAppWithJSONFile implements StreamingApplication
+{
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ DataGenerator dataGenerator = dag.addOperator("DataGenerator", DataGenerator.class);
+ JsonParser parser = dag.addOperator("Parser", JsonParser.class);
+
+ /**
+ * FSLoader is used to configure Enricher backend. Property of FSLoader file which is fileName is set in
+ * properties.xml file.
+ * The format that is used to read the file is present as an example in resources/circleMapping.txt file.
+ */
+ JsonFSLoader fsLoader = new JsonFSLoader();
+ POJOEnricher enrich = dag.addOperator("Enrich", POJOEnricher.class);
+ enrich.setStore(fsLoader);
+
+ ArrayList includeFields = new ArrayList();
+ includeFields.add("circleName");
+ ArrayList lookupFields = new ArrayList();
+ lookupFields.add("circleId");
+
+ enrich.setIncludeFields(includeFields);
+ enrich.setLookupFields(lookupFields);
+
+ ConsoleOutputOperator console = dag.addOperator("Console", ConsoleOutputOperator.class);
+
+ dag.addStream("Parse", dataGenerator.output, parser.in);
+ dag.addStream("Enrich", parser.out, enrich.input);
+ dag.addStream("Console", enrich.output, console.input);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/enricher/src/main/java/org/apache/apex/examples/enricher/LineOutputOperator.java
----------------------------------------------------------------------
diff --git a/examples/enricher/src/main/java/org/apache/apex/examples/enricher/LineOutputOperator.java b/examples/enricher/src/main/java/org/apache/apex/examples/enricher/LineOutputOperator.java
new file mode 100644
index 0000000..aca7df6
--- /dev/null
+++ b/examples/enricher/src/main/java/org/apache/apex/examples/enricher/LineOutputOperator.java
@@ -0,0 +1,34 @@
+package org.apache.apex.examples.enricher;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
+
+/**
+ * Converts each tuple to a string and writes it as a new line to the output file
+ */
+public class LineOutputOperator extends AbstractFileOutputOperator<Object>
+{
+ private static final String NL = System.lineSeparator();
+ private static final Charset CS = StandardCharsets.UTF_8;
+
+ @NotNull
+ private String baseName;
+
+ @Override
+ public byte[] getBytesForTuple(Object t) {
+ String result = new String(t.toString().getBytes(), CS) + NL;
+ return result.getBytes(CS);
+ }
+
+ @Override
+ protected String getFileName(Object tuple) {
+ return baseName;
+ }
+
+ public String getBaseName() { return baseName; }
+ public void setBaseName(String v) { baseName = v; }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/enricher/src/main/java/org/apache/apex/examples/enricher/POJO.java
----------------------------------------------------------------------
diff --git a/examples/enricher/src/main/java/org/apache/apex/examples/enricher/POJO.java b/examples/enricher/src/main/java/org/apache/apex/examples/enricher/POJO.java
new file mode 100644
index 0000000..d48bd1a
--- /dev/null
+++ b/examples/enricher/src/main/java/org/apache/apex/examples/enricher/POJO.java
@@ -0,0 +1,49 @@
+package org.apache.apex.examples.enricher;
+
+public class POJO
+{
+ private String phone;
+ private String imei;
+ private String imsi;
+ private int circleId;
+
+ public String getPhone()
+ {
+ return phone;
+ }
+
+ public void setPhone(String phone)
+ {
+ this.phone = phone;
+ }
+
+ public String getImei()
+ {
+ return imei;
+ }
+
+ public void setImei(String imei)
+ {
+ this.imei = imei;
+ }
+
+ public String getImsi()
+ {
+ return imsi;
+ }
+
+ public void setImsi(String imsi)
+ {
+ this.imsi = imsi;
+ }
+
+ public int getCircleId()
+ {
+ return circleId;
+ }
+
+ public void setCircleId(int circleId)
+ {
+ this.circleId = circleId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/enricher/src/main/java/org/apache/apex/examples/enricher/POJOEnriched.java
----------------------------------------------------------------------
diff --git a/examples/enricher/src/main/java/org/apache/apex/examples/enricher/POJOEnriched.java b/examples/enricher/src/main/java/org/apache/apex/examples/enricher/POJOEnriched.java
new file mode 100644
index 0000000..df08c7f
--- /dev/null
+++ b/examples/enricher/src/main/java/org/apache/apex/examples/enricher/POJOEnriched.java
@@ -0,0 +1,71 @@
+package org.apache.apex.examples.enricher;
+
+public class POJOEnriched
+{
+ private String phone;
+ private String imei;
+ private String imsi;
+ private int circleId;
+ private String circleName;
+
+ public String getPhone()
+ {
+ return phone;
+ }
+
+ public void setPhone(String phone)
+ {
+ this.phone = phone;
+ }
+
+ public String getImei()
+ {
+ return imei;
+ }
+
+ public void setImei(String imei)
+ {
+ this.imei = imei;
+ }
+
+ public String getImsi()
+ {
+ return imsi;
+ }
+
+ public void setImsi(String imsi)
+ {
+ this.imsi = imsi;
+ }
+
+ public int getCircleId()
+ {
+ return circleId;
+ }
+
+ public void setCircleId(int circleId)
+ {
+ this.circleId = circleId;
+ }
+
+ public String getCircleName()
+ {
+ return circleName;
+ }
+
+ public void setCircleName(String circleName)
+ {
+ this.circleName = circleName;
+ }
+
+ @Override public String toString()
+ {
+ return "POJOEnriched{" +
+ "phone='" + phone + '\'' +
+ ", imei='" + imei + '\'' +
+ ", imsi='" + imsi + '\'' +
+ ", circleId=" + circleId +
+ ", circleName='" + circleName + '\'' +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/enricher/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/enricher/src/main/resources/META-INF/properties.xml b/examples/enricher/src/main/resources/META-INF/properties.xml
index 9ecf899..543b99a 100644
--- a/examples/enricher/src/main/resources/META-INF/properties.xml
+++ b/examples/enricher/src/main/resources/META-INF/properties.xml
@@ -3,17 +3,17 @@
<!-- Parser -->
<property>
<name>dt.application.EnricherAppWithJSONFile.operator.Parser.port.out.attr.TUPLE_CLASS</name>
- <value>com.example.myapexapp.POJO</value>
+ <value>org.apache.apex.examples.enricher.POJO</value>
</property>
<!-- Enrich -->
<property>
<name>dt.application.EnricherAppWithJSONFile.operator.Enrich.port.input.attr.TUPLE_CLASS</name>
- <value>com.example.myapexapp.POJO</value>
+ <value>org.apache.apex.examples.enricher.POJO</value>
</property>
<property>
<name>dt.application.EnricherAppWithJSONFile.operator.Enrich.port.output.attr.TUPLE_CLASS</name>
- <value>com.example.myapexapp.POJOEnriched</value>
+ <value>org.apache.apex.examples.enricher.POJOEnriched</value>
</property>
<property>
<name>dt.application.EnricherAppWithJSONFile.operator.Enrich.prop.store.fileName</name>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/enricher/src/test/java/com/example/myapexapp/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/enricher/src/test/java/com/example/myapexapp/ApplicationTest.java b/examples/enricher/src/test/java/com/example/myapexapp/ApplicationTest.java
deleted file mode 100644
index 4b04603..0000000
--- a/examples/enricher/src/test/java/com/example/myapexapp/ApplicationTest.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package com.example.myapexapp;
-
-import javax.validation.ConstraintViolationException;
-
-import org.junit.Assert;
-
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Test;
-
-import com.datatorrent.api.LocalMode;
-
-/**
- * Test the DAG declaration in local mode.
- */
-public class ApplicationTest {
-
- @Test
- public void testApplication() throws Exception {
- try {
- LocalMode lma = LocalMode.newInstance();
- Configuration conf = new Configuration(false);
- conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
- lma.prepareDAG(new EnricherAppWithJSONFile(), conf);
- LocalMode.Controller lc = lma.getController();
- lc.run(10000); // runs for 10 seconds and quits
- } catch (ConstraintViolationException e) {
- Assert.fail("constraint violations: " + e.getConstraintViolations());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/enricher/src/test/java/org/apache/apex/examples/enricher/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/enricher/src/test/java/org/apache/apex/examples/enricher/ApplicationTest.java b/examples/enricher/src/test/java/org/apache/apex/examples/enricher/ApplicationTest.java
new file mode 100644
index 0000000..6b6698e
--- /dev/null
+++ b/examples/enricher/src/test/java/org/apache/apex/examples/enricher/ApplicationTest.java
@@ -0,0 +1,31 @@
+package org.apache.apex.examples.enricher;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+/**
+ * Test the DAG declaration in local mode.
+ */
+public class ApplicationTest {
+
+ @Test
+ public void testApplication() throws Exception {
+ try {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+ lma.prepareDAG(new EnricherAppWithJSONFile(), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.run(10000); // runs for 10 seconds and quits
+ } catch (ConstraintViolationException e) {
+ Assert.fail("constraint violations: " + e.getConstraintViolations());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/filter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/filter/pom.xml b/examples/filter/pom.xml
index 9407818..7ef038e 100644
--- a/examples/filter/pom.xml
+++ b/examples/filter/pom.xml
@@ -1,266 +1,26 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
-
- <groupId>com.datatorrent.tutorial</groupId>
- <version>1.0-SNAPSHOT</version>
- <artifactId>filter</artifactId>
+
+ <parent>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-examples</artifactId>
+ <version>3.7.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>malhar-examples-filter</artifactId>
<packaging>jar</packaging>
<!-- change these to the appropriate values -->
<name>Filter Operator</name>
<description>Apex application demonstrating filter operator</description>
- <properties>
- <!-- change this if you desire to use a different version of Apex Core -->
- <apex.version>3.5.0</apex.version>
- <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
- <malhar.version>3.6.0</malhar.version>
- </properties>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-eclipse-plugin</artifactId>
- <version>2.9</version>
- <configuration>
- <downloadSources>true</downloadSources>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.3</version>
- <configuration>
- <encoding>UTF-8</encoding>
- <source>1.7</source>
- <target>1.7</target>
- <debug>true</debug>
- <optimize>false</optimize>
- <showDeprecation>true</showDeprecation>
- <showWarnings>true</showWarnings>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-dependency-plugin</artifactId>
- <version>2.8</version>
- <executions>
- <execution>
- <id>copy-dependencies</id>
- <phase>prepare-package</phase>
- <goals>
- <goal>copy-dependencies</goal>
- </goals>
- <configuration>
- <outputDirectory>target/deps</outputDirectory>
- <includeScope>runtime</includeScope>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <executions>
- <execution>
- <id>app-package-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- <configuration>
- <finalName>${project.artifactId}-${project.version}-apexapp</finalName>
- <appendAssemblyId>false</appendAssemblyId>
- <descriptors>
- <descriptor>src/assemble/appPackage.xml</descriptor>
- </descriptors>
- <archiverConfig>
- <defaultDirectoryMode>0755</defaultDirectoryMode>
- </archiverConfig>
- <archive>
- <manifestEntries>
- <Class-Path>${apex.apppackage.classpath}</Class-Path>
- <DT-Engine-Version>${apex.version}</DT-Engine-Version>
- <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id>
- <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name>
- <DT-App-Package-Version>${project.version}</DT-App-Package-Version>
- <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name>
- <DT-App-Package-Description>${project.description}</DT-App-Package-Description>
- </manifestEntries>
- </archive>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <artifactId>maven-antrun-plugin</artifactId>
- <version>1.7</version>
- <executions>
- <execution>
- <phase>package</phase>
- <configuration>
- <target>
- <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar"
- tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" />
- </target>
- </configuration>
- <goals>
- <goal>run</goal>
- </goals>
- </execution>
- <execution>
- <!-- create resource directory for xml javadoc-->
- <id>createJavadocDirectory</id>
- <phase>generate-resources</phase>
- <configuration>
- <tasks>
- <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/>
- <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/>
- </tasks>
- </configuration>
- <goals>
- <goal>run</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.9.1</version>
- <executions>
- <execution>
- <id>attach-artifacts</id>
- <phase>package</phase>
- <goals>
- <goal>attach-artifact</goal>
- </goals>
- <configuration>
- <artifacts>
- <artifact>
- <file>target/${project.artifactId}-${project.version}.apa</file>
- <type>apa</type>
- </artifact>
- </artifacts>
- <skipAttach>false</skipAttach>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <!-- generate javdoc -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <executions>
- <!-- generate xml javadoc -->
- <execution>
- <id>xml-doclet</id>
- <phase>generate-resources</phase>
- <goals>
- <goal>javadoc</goal>
- </goals>
- <configuration>
- <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet>
- <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam>
- <useStandardDocletOptions>false</useStandardDocletOptions>
- <docletArtifact>
- <groupId>com.github.markusbernhardt</groupId>
- <artifactId>xml-doclet</artifactId>
- <version>1.0.4</version>
- </docletArtifact>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags-->
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>xml-maven-plugin</artifactId>
- <version>1.0</version>
- <executions>
- <execution>
- <id>transform-xmljavadoc</id>
- <phase>generate-resources</phase>
- <goals>
- <goal>transform</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <transformationSets>
- <transformationSet>
- <dir>${project.build.directory}/generated-resources/xml-javadoc</dir>
- <includes>
- <include>${project.artifactId}-${project.version}-javadoc.xml</include>
- </includes>
- <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet>
- <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir>
- </transformationSet>
- </transformationSets>
- </configuration>
- </plugin>
- <!-- copy xml javadoc to class jar -->
- <plugin>
- <artifactId>maven-resources-plugin</artifactId>
- <version>2.6</version>
- <executions>
- <execution>
- <id>copy-resources</id>
- <phase>process-resources</phase>
- <goals>
- <goal>copy-resources</goal>
- </goals>
- <configuration>
- <outputDirectory>${basedir}/target/classes</outputDirectory>
- <resources>
- <resource>
- <directory>${project.build.directory}/generated-resources/xml-javadoc</directory>
- <includes>
- <include>${project.artifactId}-${project.version}-javadoc.xml</include>
- </includes>
- <filtering>true</filtering>
- </resource>
- </resources>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- </plugins>
-
- </build>
-
<dependencies>
- <!-- add your dependencies here -->
- <dependency>
- <groupId>org.apache.apex</groupId>
- <artifactId>malhar-library</artifactId>
- <version>${malhar.version}</version>
- <!--
- If you know that your application does not need transitive dependencies pulled in by malhar-library,
- uncomment the following to reduce the size of your app package.
- -->
- <exclusions>
- <exclusion>
- <groupId>*</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- <version>3.1</version>
- <type>jar</type>
- </dependency>
<dependency>
<groupId>org.apache.apex</groupId>
<artifactId>malhar-contrib</artifactId>
- <version>${malhar.version}</version>
- <!--
+ <version>${project.version}</version>
+ <!--
If you know that your application does not need transitive dependencies pulled in by malhar-library,
uncomment the following to reduce the size of your app package.
-->
@@ -277,23 +37,10 @@
<version>2.4.0</version>
<optional>true</optional>
</dependency>
-
- <dependency>
- <groupId>org.apache.apex</groupId>
- <artifactId>apex-common</artifactId>
- <version>${apex.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.10</version>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.apache.apex</groupId>
<artifactId>apex-engine</artifactId>
- <version>${apex.version}</version>
+ <version>${apex.core.version}</version>
<scope>test</scope>
</dependency>
<dependency>
@@ -308,5 +55,4 @@
<version>2.7.8</version>
</dependency>
</dependencies>
-
</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/filter/src/main/java/com/datatorrent/tutorial/filter/Application.java
----------------------------------------------------------------------
diff --git a/examples/filter/src/main/java/com/datatorrent/tutorial/filter/Application.java b/examples/filter/src/main/java/com/datatorrent/tutorial/filter/Application.java
deleted file mode 100644
index 4ebb153..0000000
--- a/examples/filter/src/main/java/com/datatorrent/tutorial/filter/Application.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Copyright (c) 2016 DataTorrent, Inc.
- * All rights reserved.
- */
-
-package com.datatorrent.tutorial.filter;
-
-import org.apache.apex.malhar.lib.fs.FSRecordReaderModule;
-import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator.StringFileOutputOperator;
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.contrib.formatter.CsvFormatter;
-import com.datatorrent.contrib.parser.CsvParser;
-import com.datatorrent.lib.filter.FilterOperator;
-
-/**
- * Simple application illustrating filter operator
- */
-@ApplicationAnnotation(name="FilterExample")
-public class Application implements StreamingApplication
-{
-
- @Override
- public void populateDAG(DAG dag, Configuration conf)
- {
-
- FSRecordReaderModule recordReader = dag.addModule("recordReader", FSRecordReaderModule.class);
- CsvParser csvParser = dag.addOperator("csvParser", CsvParser.class);
- FilterOperator filterOperator = dag.addOperator("filterOperator", new FilterOperator());
-
- CsvFormatter selectedFormatter = dag.addOperator("selectedFormatter", new CsvFormatter());
- CsvFormatter rejectedFormatter = dag.addOperator("rejectedFormatter", new CsvFormatter());
-
- StringFileOutputOperator selectedOutput = dag.addOperator("selectedOutput", new StringFileOutputOperator());
- StringFileOutputOperator rejectedOutput = dag.addOperator("rejectedOutput", new StringFileOutputOperator());
-
- dag.addStream("record", recordReader.records, csvParser.in);
- dag.addStream("pojo", csvParser.out, filterOperator.input);
-
- dag.addStream("pojoSelected", filterOperator.truePort, selectedFormatter.in);
- dag.addStream("pojoRejected", filterOperator.falsePort, rejectedFormatter.in);
-
- dag.addStream("csvSelected", selectedFormatter.out, selectedOutput.input);
- dag.addStream("csvRejected", rejectedFormatter.out, rejectedOutput.input);
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/filter/src/main/java/com/datatorrent/tutorial/filter/TransactionPOJO.java
----------------------------------------------------------------------
diff --git a/examples/filter/src/main/java/com/datatorrent/tutorial/filter/TransactionPOJO.java b/examples/filter/src/main/java/com/datatorrent/tutorial/filter/TransactionPOJO.java
deleted file mode 100644
index c0d8817..0000000
--- a/examples/filter/src/main/java/com/datatorrent/tutorial/filter/TransactionPOJO.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Copyright (c) 2016 DataTorrent, Inc.
- * All rights reserved.
- */
-
-package com.datatorrent.tutorial.filter;
-
-public class TransactionPOJO
-{
-
- private long trasactionId;
- private double amount;
- private long accountNumber;
- private String type;
-
- public long getTrasactionId()
- {
- return trasactionId;
- }
-
- public void setTrasactionId(long trasactionId)
- {
- this.trasactionId = trasactionId;
- }
-
- public double getAmount()
- {
- return amount;
- }
-
- public void setAmount(double amount)
- {
- this.amount = amount;
- }
-
- public long getAccountNumber()
- {
- return accountNumber;
- }
-
- public void setAccountNumber(long accountNumber)
- {
- this.accountNumber = accountNumber;
- }
-
- public String getType()
- {
- return type;
- }
-
- public void setType(String type)
- {
- this.type = type;
- }
-
- @Override
- public String toString()
- {
- return "TransactionPOJO [trasactionId=" + trasactionId + ", amount=" + amount + ", accountNumber=" + accountNumber
- + ", type=" + type + "]";
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/filter/src/main/java/org/apache/apex/examples/filter/Application.java
----------------------------------------------------------------------
diff --git a/examples/filter/src/main/java/org/apache/apex/examples/filter/Application.java b/examples/filter/src/main/java/org/apache/apex/examples/filter/Application.java
new file mode 100644
index 0000000..5bbf159
--- /dev/null
+++ b/examples/filter/src/main/java/org/apache/apex/examples/filter/Application.java
@@ -0,0 +1,49 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc.
+ * All rights reserved.
+ */
+
+package org.apache.apex.examples.filter;
+
+import org.apache.apex.malhar.lib.fs.FSRecordReaderModule;
+import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator.StringFileOutputOperator;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.formatter.CsvFormatter;
+import com.datatorrent.contrib.parser.CsvParser;
+import com.datatorrent.lib.filter.FilterOperator;
+
+/**
+ * Simple application illustrating filter operator
+ */
+@ApplicationAnnotation(name="FilterExample")
+public class Application implements StreamingApplication
+{
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+
+ FSRecordReaderModule recordReader = dag.addModule("recordReader", FSRecordReaderModule.class);
+ CsvParser csvParser = dag.addOperator("csvParser", CsvParser.class);
+ FilterOperator filterOperator = dag.addOperator("filterOperator", new FilterOperator());
+
+ CsvFormatter selectedFormatter = dag.addOperator("selectedFormatter", new CsvFormatter());
+ CsvFormatter rejectedFormatter = dag.addOperator("rejectedFormatter", new CsvFormatter());
+
+ StringFileOutputOperator selectedOutput = dag.addOperator("selectedOutput", new StringFileOutputOperator());
+ StringFileOutputOperator rejectedOutput = dag.addOperator("rejectedOutput", new StringFileOutputOperator());
+
+ dag.addStream("record", recordReader.records, csvParser.in);
+ dag.addStream("pojo", csvParser.out, filterOperator.input);
+
+ dag.addStream("pojoSelected", filterOperator.truePort, selectedFormatter.in);
+ dag.addStream("pojoRejected", filterOperator.falsePort, rejectedFormatter.in);
+
+ dag.addStream("csvSelected", selectedFormatter.out, selectedOutput.input);
+ dag.addStream("csvRejected", rejectedFormatter.out, rejectedOutput.input);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/filter/src/main/java/org/apache/apex/examples/filter/TransactionPOJO.java
----------------------------------------------------------------------
diff --git a/examples/filter/src/main/java/org/apache/apex/examples/filter/TransactionPOJO.java b/examples/filter/src/main/java/org/apache/apex/examples/filter/TransactionPOJO.java
new file mode 100644
index 0000000..4b0d7a9
--- /dev/null
+++ b/examples/filter/src/main/java/org/apache/apex/examples/filter/TransactionPOJO.java
@@ -0,0 +1,62 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc.
+ * All rights reserved.
+ */
+
+package org.apache.apex.examples.filter;
+
+public class TransactionPOJO
+{
+
+ private long trasactionId;
+ private double amount;
+ private long accountNumber;
+ private String type;
+
+ public long getTrasactionId()
+ {
+ return trasactionId;
+ }
+
+ public void setTrasactionId(long trasactionId)
+ {
+ this.trasactionId = trasactionId;
+ }
+
+ public double getAmount()
+ {
+ return amount;
+ }
+
+ public void setAmount(double amount)
+ {
+ this.amount = amount;
+ }
+
+ public long getAccountNumber()
+ {
+ return accountNumber;
+ }
+
+ public void setAccountNumber(long accountNumber)
+ {
+ this.accountNumber = accountNumber;
+ }
+
+ public String getType()
+ {
+ return type;
+ }
+
+ public void setType(String type)
+ {
+ this.type = type;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "TransactionPOJO [trasactionId=" + trasactionId + ", amount=" + amount + ", accountNumber=" + accountNumber
+ + ", type=" + type + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/filter/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/filter/src/main/resources/META-INF/properties.xml b/examples/filter/src/main/resources/META-INF/properties.xml
index 079fb3f..8c1a4a3 100644
--- a/examples/filter/src/main/resources/META-INF/properties.xml
+++ b/examples/filter/src/main/resources/META-INF/properties.xml
@@ -33,7 +33,7 @@
</property>
<property>
<name>dt.application.FilterExample.operator.csvParser.port.out.attr.TUPLE_CLASS</name>
- <value>com.datatorrent.tutorial.filter.TransactionPOJO</value>
+ <value>org.apache.apex.examples.filter.TransactionPOJO</value>
</property>
<property>
<name>dt.application.FilterExample.operator.selectedOutput.prop.maxIdleWindows</name>
@@ -45,15 +45,15 @@
</property>
<property>
<name>dt.application.FilterExample.operator.filterOperator.port.input.attr.TUPLE_CLASS</name>
- <value>com.datatorrent.tutorial.filter.TransactionPOJO</value>
+ <value>org.apache.apex.examples.filter.TransactionPOJO</value>
</property>
<property>
<name>dt.application.FilterExample.operator.selectedFormatter.port.in.attr.TUPLE_CLASS</name>
- <value>com.datatorrent.tutorial.filter.TransactionPOJO</value>
+ <value>org.apache.apex.examples.filter.TransactionPOJO</value>
</property>
<property>
<name>dt.application.FilterExample.operator.rejectedFormatter.port.in.attr.TUPLE_CLASS</name>
- <value>com.datatorrent.tutorial.filter.TransactionPOJO</value>
+ <value>org.apache.apex.examples.filter.TransactionPOJO</value>
</property>
<property>
<name>dt.application.FilterExample.operator.filterOperator.prop.condition</name>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/filter/src/test/java/com/datatorrent/tutorial/filter/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/filter/src/test/java/com/datatorrent/tutorial/filter/ApplicationTest.java b/examples/filter/src/test/java/com/datatorrent/tutorial/filter/ApplicationTest.java
deleted file mode 100644
index a90e822..0000000
--- a/examples/filter/src/test/java/com/datatorrent/tutorial/filter/ApplicationTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Copyright (c) 2016 DataTorrent, Inc.
- * All rights reserved.
- */
-
-package com.datatorrent.tutorial.filter;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.Callable;
-
-import javax.validation.ConstraintViolationException;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.stram.StramLocalCluster;
-
-/**
- * Test the DAG declaration in local mode.
- */
-public class ApplicationTest
-{
- private String outputDir;
-
- public static class TestMeta extends TestWatcher
- {
- public String baseDirectory;
-
- @Override
- protected void starting(org.junit.runner.Description description)
- {
- this.baseDirectory = "target/" + description.getClassName() + "/" + description.getMethodName();
- }
-
- @Override
- protected void finished(Description description)
- {
- super.finished(description);
- try {
- FileUtils.forceDelete(new File(baseDirectory));
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- }
-
- @Rule
- public TestMeta testMeta = new TestMeta();
-
- @Before
- public void setup() throws Exception
- {
- outputDir = testMeta.baseDirectory + File.separator + "output";
- }
-
- @Test
- public void testApplication() throws IOException, Exception
- {
-
- try {
- LocalMode lma = LocalMode.newInstance();
- Configuration conf = new Configuration(false);
- conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
- conf.set("dt.application.FilterExample.operator.selectedOutput.prop.filePath", outputDir);
- conf.set("dt.application.FilterExample.operator.rejectedOutput.prop.filePath", outputDir);
- final File selectedfile = FileUtils.getFile(outputDir, "selected.txt_8.0");
- final File rejectedfile = FileUtils.getFile(outputDir, "rejected.txt_6.0");
-
- lma.prepareDAG(new Application(), conf);
- LocalMode.Controller lc = lma.getController();
-
- ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
- {
- @Override
- public Boolean call() throws Exception
- {
- if (selectedfile.exists() && rejectedfile.exists()) {
- return true;
- }
- return false;
- }
- });
-
- lc.run(40000);
- Assert.assertTrue(
- FileUtils.contentEquals(
- FileUtils.getFile(
- "src/main/resources/META-INF/selected_output.txt"
- ),selectedfile));
-
- Assert.assertTrue(
- FileUtils.contentEquals(
- FileUtils.getFile(
- "src/main/resources/META-INF/rejected_output.txt"
- ),rejectedfile));
-
- } catch (ConstraintViolationException e) {
- Assert.fail("constraint violations: " + e.getConstraintViolations());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/filter/src/test/java/org/apache/apex/examples/filter/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/filter/src/test/java/org/apache/apex/examples/filter/ApplicationTest.java b/examples/filter/src/test/java/org/apache/apex/examples/filter/ApplicationTest.java
new file mode 100644
index 0000000..2170753
--- /dev/null
+++ b/examples/filter/src/test/java/org/apache/apex/examples/filter/ApplicationTest.java
@@ -0,0 +1,96 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc.
+ * All rights reserved.
+ */
+
+package org.apache.apex.examples.filter;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Test the DAG declaration in local mode.
+ */
+public class ApplicationTest
+{
+ private String outputDir;
+
+ public static class TestMeta extends TestWatcher
+ {
+ public String baseDirectory;
+
+ @Override
+ protected void starting(org.junit.runner.Description description)
+ {
+ this.baseDirectory = "target/" + description.getClassName() + "/" + description.getMethodName();
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ super.finished(description);
+ try {
+ FileUtils.forceDelete(new File(baseDirectory));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+ @Before
+ public void setup() throws Exception
+ {
+ outputDir = testMeta.baseDirectory + File.separator + "output";
+ }
+
+ @Test
+ public void testApplication() throws IOException, Exception
+ {
+ try {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+ conf.set("dt.application.FilterExample.operator.selectedOutput.prop.filePath", outputDir);
+ conf.set("dt.application.FilterExample.operator.rejectedOutput.prop.filePath", outputDir);
+ final File selectedfile = FileUtils.getFile(outputDir, "selected.txt_8.0");
+ final File rejectedfile = FileUtils.getFile(outputDir, "rejected.txt_6.0");
+
+ lma.prepareDAG(new Application(), conf);
+ LocalMode.Controller lc = lma.getController();
+
+ ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ if (selectedfile.exists() && rejectedfile.exists()) {
+ return true;
+ }
+ return false;
+ }
+ });
+
+ } catch (ConstraintViolationException e) {
+ Assert.fail("constraint violations: " + e.getConstraintViolations());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/innerjoin/pom.xml
----------------------------------------------------------------------
diff --git a/examples/innerjoin/pom.xml b/examples/innerjoin/pom.xml
index 29c1b90..78193ca 100644
--- a/examples/innerjoin/pom.xml
+++ b/examples/innerjoin/pom.xml
@@ -2,266 +2,49 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
- <groupId>com.example</groupId>
- <version>1.0-SNAPSHOT</version>
- <artifactId>innerjoin</artifactId>
+ <parent>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-examples</artifactId>
+ <version>3.7.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>malhar-examples-innerjoin</artifactId>
<packaging>jar</packaging>
<name>Inner Join Application</name>
<description>Sample Application for Inner Join</description>
- <properties>
- <!-- change this if you desire to use a different version of Apex Core -->
- <apex.version>3.5.0</apex.version>
- <malhar.version>3.6.0</malhar.version>
- <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
- </properties>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-eclipse-plugin</artifactId>
- <version>2.9</version>
- <configuration>
- <downloadSources>true</downloadSources>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.3</version>
- <configuration>
- <encoding>UTF-8</encoding>
- <source>1.7</source>
- <target>1.7</target>
- <debug>true</debug>
- <optimize>false</optimize>
- <showDeprecation>true</showDeprecation>
- <showWarnings>true</showWarnings>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-dependency-plugin</artifactId>
- <version>2.8</version>
- <executions>
- <execution>
- <id>copy-dependencies</id>
- <phase>prepare-package</phase>
- <goals>
- <goal>copy-dependencies</goal>
- </goals>
- <configuration>
- <outputDirectory>target/deps</outputDirectory>
- <includeScope>runtime</includeScope>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <executions>
- <execution>
- <id>app-package-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- <configuration>
- <finalName>${project.artifactId}-${project.version}-apexapp</finalName>
- <appendAssemblyId>false</appendAssemblyId>
- <descriptors>
- <descriptor>src/assemble/appPackage.xml</descriptor>
- </descriptors>
- <archiverConfig>
- <defaultDirectoryMode>0755</defaultDirectoryMode>
- </archiverConfig>
- <archive>
- <manifestEntries>
- <Class-Path>${apex.apppackage.classpath}</Class-Path>
- <DT-Engine-Version>${apex.version}</DT-Engine-Version>
- <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id>
- <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name>
- <DT-App-Package-Version>${project.version}</DT-App-Package-Version>
- <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name>
- <DT-App-Package-Description>${project.description}</DT-App-Package-Description>
- </manifestEntries>
- </archive>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <artifactId>maven-antrun-plugin</artifactId>
- <version>1.7</version>
- <executions>
- <execution>
- <phase>package</phase>
- <configuration>
- <target>
- <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar"
- tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" />
- </target>
- </configuration>
- <goals>
- <goal>run</goal>
- </goals>
- </execution>
- <execution>
- <!-- create resource directory for xml javadoc-->
- <id>createJavadocDirectory</id>
- <phase>generate-resources</phase>
- <configuration>
- <tasks>
- <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/>
- <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/>
- </tasks>
- </configuration>
- <goals>
- <goal>run</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.9.1</version>
- <executions>
- <execution>
- <id>attach-artifacts</id>
- <phase>package</phase>
- <goals>
- <goal>attach-artifact</goal>
- </goals>
- <configuration>
- <artifacts>
- <artifact>
- <file>target/${project.artifactId}-${project.version}.apa</file>
- <type>apa</type>
- </artifact>
- </artifacts>
- <skipAttach>false</skipAttach>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <!-- generate javdoc -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <executions>
- <!-- generate xml javadoc -->
- <execution>
- <id>xml-doclet</id>
- <phase>generate-resources</phase>
- <goals>
- <goal>javadoc</goal>
- </goals>
- <configuration>
- <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet>
- <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam>
- <useStandardDocletOptions>false</useStandardDocletOptions>
- <docletArtifact>
- <groupId>com.github.markusbernhardt</groupId>
- <artifactId>xml-doclet</artifactId>
- <version>1.0.4</version>
- </docletArtifact>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags-->
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>xml-maven-plugin</artifactId>
- <version>1.0</version>
- <executions>
- <execution>
- <id>transform-xmljavadoc</id>
- <phase>generate-resources</phase>
- <goals>
- <goal>transform</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <transformationSets>
- <transformationSet>
- <dir>${project.build.directory}/generated-resources/xml-javadoc</dir>
- <includes>
- <include>${project.artifactId}-${project.version}-javadoc.xml</include>
- </includes>
- <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet>
- <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir>
- </transformationSet>
- </transformationSets>
- </configuration>
- </plugin>
- <!-- copy xml javadoc to class jar -->
- <plugin>
- <artifactId>maven-resources-plugin</artifactId>
- <version>2.6</version>
- <executions>
- <execution>
- <id>copy-resources</id>
- <phase>process-resources</phase>
- <goals>
- <goal>copy-resources</goal>
- </goals>
- <configuration>
- <outputDirectory>${basedir}/target/classes</outputDirectory>
- <resources>
- <resource>
- <directory>${project.build.directory}/generated-resources/xml-javadoc</directory>
- <includes>
- <include>${project.artifactId}-${project.version}-javadoc.xml</include>
- </includes>
- <filtering>true</filtering>
- </resource>
- </resources>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- </plugins>
-
- </build>
-
<dependencies>
- <!-- add your dependencies here -->
<dependency>
<groupId>org.apache.apex</groupId>
<artifactId>malhar-library</artifactId>
- <version>${malhar.version}</version>
+ <version>${project.version}</version>
+ <!--
+ If you know that your application does not need transitive dependencies pulled in by malhar-library,
+ uncomment the following to reduce the size of your app package.
+ -->
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
- <groupId>org.apache.apex</groupId>
- <artifactId>apex-common</artifactId>
- <version>${apex.version}</version>
- <scope>provided</scope>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.14.1</version>
</dependency>
<dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.10</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.apex</groupId>
- <artifactId>apex-engine</artifactId>
- <version>${apex.version}</version>
- <scope>test</scope>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>2.9.1</version>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
- <artifactId>janino</artifactId>
+ <artifactId>commons-compiler</artifactId>
<version>2.7.8</version>
+ <type>jar</type>
</dependency>
</dependencies>
-
</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/innerjoin/src/main/java/com/example/join/InnerJoinApplication.java
----------------------------------------------------------------------
diff --git a/examples/innerjoin/src/main/java/com/example/join/InnerJoinApplication.java b/examples/innerjoin/src/main/java/com/example/join/InnerJoinApplication.java
deleted file mode 100644
index 0b2f663..0000000
--- a/examples/innerjoin/src/main/java/com/example/join/InnerJoinApplication.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package com.example.join;
-
-import org.apache.apex.malhar.lib.join.POJOInnerJoinOperator;
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.common.partitioner.StatelessPartitioner;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-
-@ApplicationAnnotation(name="InnerJoinExample")
-public class InnerJoinApplication implements StreamingApplication
-{
- @Override
- public void populateDAG(DAG dag, Configuration conf)
- {
- // SalesEvent Generator
- POJOGenerator salesGenerator = dag.addOperator("Input1", new POJOGenerator());
- // ProductEvent Generator
- POJOGenerator productGenerator = dag.addOperator("Input2", new POJOGenerator());
- productGenerator.setSalesEvent(false);
-
- // Inner join Operator
- POJOInnerJoinOperator join = dag.addOperator("Join", new POJOInnerJoinOperator());
- ConsoleOutputOperator output = dag.addOperator("Output", new ConsoleOutputOperator());
-
- // Streams
- dag.addStream("SalesToJoin", salesGenerator.output, join.input1);
- dag.addStream("ProductToJoin", productGenerator.output, join.input2);
- dag.addStream("JoinToConsole", join.outputPort, output.input);
-
- // Setting tuple class properties to the ports of join operator
- dag.setInputPortAttribute(join.input1, Context.PortContext.TUPLE_CLASS, POJOGenerator.SalesEvent.class);
- dag.setInputPortAttribute(join.input2, Context.PortContext.TUPLE_CLASS, POJOGenerator.ProductEvent.class);
- dag.setOutputPortAttribute(join.outputPort,Context.PortContext.TUPLE_CLASS, POJOGenerator.SalesEvent.class);
- }
-}