You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/27 15:19:19 UTC
[18/19] apex-malhar git commit: APEXMALHAR-2233 Updated the examples
to follow the structure of apex-malhar examples. Specified dependencies in
pom.xmls of individual examples correctly.
APEXMALHAR-2233 Updated the examples to follow the structure of apex-malhar examples. Specified dependencies in pom.xmls of individual examples correctly.
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/9c154f20
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/9c154f20
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/9c154f20
Branch: refs/heads/master
Commit: 9c154f204042a9e1974c2466e8783505b2c6da03
Parents: 8e20097
Author: Lakshmi Prasanna Velineni <la...@datatorrent.com>
Authored: Sun Mar 19 22:40:04 2017 -0700
Committer: Lakshmi Prasanna Velineni <la...@datatorrent.com>
Committed: Sun Mar 26 11:43:48 2017 -0700
----------------------------------------------------------------------
examples/csvformatter/pom.xml | 299 +++----------------
.../java/com/demo/myapexapp/Application.java | 45 ---
.../com/demo/myapexapp/HDFSOutputOperator.java | 87 ------
.../java/com/demo/myapexapp/JsonGenerator.java | 78 -----
.../main/java/com/demo/myapexapp/PojoEvent.java | 141 ---------
.../apex/examples/csvformatter/Application.java | 37 +++
.../csvformatter/HDFSOutputOperator.java | 87 ++++++
.../examples/csvformatter/JsonGenerator.java | 76 +++++
.../apex/examples/csvformatter/PojoEvent.java | 141 +++++++++
.../src/main/resources/META-INF/properties.xml | 2 +-
.../com/demo/myapexapp/ApplicationTest.java | 67 -----
.../examples/csvformatter/ApplicationTest.java | 65 ++++
examples/dedup/pom.xml | 277 +----------------
.../java/com/example/dedup/Application.java | 123 --------
.../apache/apex/examples/dedup/Application.java | 122 ++++++++
.../java/com/example/dedup/ApplicationTest.java | 38 ---
.../apex/examples/dedup/ApplicationTest.java | 37 +++
examples/dynamic-partition/pom.xml | 270 +----------------
.../src/main/java/com/example/dynamic/App.java | 23 --
.../src/main/java/com/example/dynamic/Gen.java | 169 -----------
.../org/apache/apex/examples/dynamic/App.java | 23 ++
.../org/apache/apex/examples/dynamic/Gen.java | 171 +++++++++++
.../com/example/dynamic/ApplicationTest.java | 34 ---
.../apex/examples/dynamic/ApplicationTest.java | 33 ++
examples/enricher/pom.xml | 298 ++----------------
.../com/example/myapexapp/DataGenerator.java | 94 ------
.../myapexapp/EnricherAppWithJSONFile.java | 47 ---
.../example/myapexapp/LineOutputOperator.java | 34 ---
.../main/java/com/example/myapexapp/POJO.java | 49 ---
.../com/example/myapexapp/POJOEnriched.java | 71 -----
.../apex/examples/enricher/DataGenerator.java | 94 ++++++
.../enricher/EnricherAppWithJSONFile.java | 47 +++
.../examples/enricher/LineOutputOperator.java | 34 +++
.../org/apache/apex/examples/enricher/POJO.java | 49 +++
.../apex/examples/enricher/POJOEnriched.java | 71 +++++
.../src/main/resources/META-INF/properties.xml | 6 +-
.../com/example/myapexapp/ApplicationTest.java | 31 --
.../apex/examples/enricher/ApplicationTest.java | 31 ++
examples/filter/pom.xml | 276 +----------------
.../tutorial/filter/Application.java | 49 ---
.../tutorial/filter/TransactionPOJO.java | 64 ----
.../apex/examples/filter/Application.java | 49 +++
.../apex/examples/filter/TransactionPOJO.java | 62 ++++
.../src/main/resources/META-INF/properties.xml | 8 +-
.../tutorial/filter/ApplicationTest.java | 111 -------
.../apex/examples/filter/ApplicationTest.java | 96 ++++++
examples/innerjoin/pom.xml | 269 ++---------------
.../com/example/join/InnerJoinApplication.java | 39 ---
.../java/com/example/join/POJOGenerator.java | 260 ----------------
.../innerjoin/InnerJoinApplication.java | 38 +++
.../apex/examples/innerjoin/POJOGenerator.java | 260 ++++++++++++++++
.../example/join/InnerJoinApplicationTest.java | 21 --
.../innerjoin/InnerJoinApplicationTest.java | 21 ++
examples/parser/pom.xml | 268 ++---------------
.../tutorial/jsonparser/Application.java | 35 ---
.../tutorial/jsonparser/Campaign.java | 74 -----
.../tutorial/jsonparser/JsonGenerator.java | 83 -----
.../examples/parser/jsonparser/Application.java | 35 +++
.../examples/parser/jsonparser/Campaign.java | 74 +++++
.../parser/jsonparser/JsonGenerator.java | 83 +++++
.../src/main/resources/META-INF/properties.xml | 4 +-
.../tutorial/jsonparser/ApplicationTest.java | 36 ---
.../parser/jsonparser/ApplicationTest.java | 35 +++
examples/partition/pom.xml | 276 +----------------
.../java/com/example/myapexapp/Application.java | 27 --
.../main/java/com/example/myapexapp/Codec3.java | 13 -
.../myapexapp/RandomNumberGenerator.java | 83 -----
.../com/example/myapexapp/TestPartition.java | 164 ----------
.../apex/examples/partition/Application.java | 25 ++
.../apache/apex/examples/partition/Codec3.java | 13 +
.../partition/RandomNumberGenerator.java | 76 +++++
.../apex/examples/partition/TestPartition.java | 149 +++++++++
.../src/main/resources/my-log4j.properties | 2 +-
.../com/example/myapexapp/ApplicationTest.java | 37 ---
.../examples/partition/ApplicationTest.java | 36 +++
examples/pom.xml | 10 +
examples/recordReader/pom.xml | 284 +-----------------
.../com/example/recordReader/Application.java | 32 --
.../recordReader/TransactionsSchema.java | 168 -----------
.../apex/examples/recordReader/Application.java | 32 ++
.../recordReader/TransactionsSchema.java | 168 +++++++++++
.../src/main/resources/META-INF/properties.xml | 4 +-
.../example/recordReader/ApplicationTest.java | 91 ------
.../examples/recordReader/ApplicationTest.java | 91 ++++++
examples/throttle/pom.xml | 256 +---------------
.../examples/throttle/Application.java | 51 ----
.../examples/throttle/PassThroughOperator.java | 20 --
.../throttle/RandomNumberGenerator.java | 64 ----
.../examples/throttle/SlowDevNullOperator.java | 35 ---
.../throttle/ThrottlingStatsListener.java | 150 ----------
.../apex/examples/throttle/Application.java | 51 ++++
.../examples/throttle/PassThroughOperator.java | 20 ++
.../throttle/RandomNumberGenerator.java | 64 ++++
.../examples/throttle/SlowDevNullOperator.java | 35 +++
.../throttle/ThrottlingStatsListener.java | 150 ++++++++++
.../examples/throttle/ApplicationTest.java | 37 ---
.../apex/examples/throttle/ApplicationTest.java | 36 +++
examples/transform/pom.xml | 250 +---------------
.../java/com/example/transform/Application.java | 39 ---
.../com/example/transform/CustomerEvent.java | 74 -----
.../com/example/transform/CustomerInfo.java | 60 ----
.../transform/DynamicTransformApplication.java | 52 ----
.../com/example/transform/POJOGenerator.java | 125 --------
.../apex/examples/transform/Application.java | 39 +++
.../apex/examples/transform/CustomerEvent.java | 74 +++++
.../apex/examples/transform/CustomerInfo.java | 60 ++++
.../transform/DynamicTransformApplication.java | 51 ++++
.../apex/examples/transform/POJOGenerator.java | 125 ++++++++
.../com/example/transform/ApplicationTest.java | 21 --
.../examples/transform/ApplicationTest.java | 21 ++
110 files changed, 3404 insertions(+), 6088 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/csvformatter/pom.xml b/examples/csvformatter/pom.xml
index 9033db5..be3be7a 100644
--- a/examples/csvformatter/pom.xml
+++ b/examples/csvformatter/pom.xml
@@ -1,266 +1,61 @@
<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
- <groupId>com.example</groupId>
- <version>1.0-SNAPSHOT</version>
- <artifactId>formatter</artifactId>
+ <parent>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-examples</artifactId>
+ <version>3.7.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>malhar-examples-csvformatter</artifactId>
<packaging>jar</packaging>
<!-- change these to the appropriate values -->
<name>Formatter Apps</name>
<description>Applications to showcase different formatters</description>
- <properties>
- <!-- change this if you desire to use a different version of Apex Core -->
- <apex.version>3.5.0</apex.version>
- <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
- <malhar.version>3.6.0</malhar.version>
- </properties>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-eclipse-plugin</artifactId>
- <version>2.9</version>
- <configuration>
- <downloadSources>true</downloadSources>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.3</version>
- <configuration>
- <encoding>UTF-8</encoding>
- <source>1.7</source>
- <target>1.7</target>
- <debug>true</debug>
- <optimize>false</optimize>
- <showDeprecation>true</showDeprecation>
- <showWarnings>true</showWarnings>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-dependency-plugin</artifactId>
- <version>2.8</version>
- <executions>
- <execution>
- <id>copy-dependencies</id>
- <phase>prepare-package</phase>
- <goals>
- <goal>copy-dependencies</goal>
- </goals>
- <configuration>
- <outputDirectory>target/deps</outputDirectory>
- <includeScope>runtime</includeScope>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <executions>
- <execution>
- <id>app-package-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- <configuration>
- <finalName>${project.artifactId}-${project.version}-apexapp</finalName>
- <appendAssemblyId>false</appendAssemblyId>
- <descriptors>
- <descriptor>src/assemble/appPackage.xml</descriptor>
- </descriptors>
- <archiverConfig>
- <defaultDirectoryMode>0755</defaultDirectoryMode>
- </archiverConfig>
- <archive>
- <manifestEntries>
- <Class-Path>${apex.apppackage.classpath}</Class-Path>
- <DT-Engine-Version>${apex.version}</DT-Engine-Version>
- <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id>
- <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name>
- <DT-App-Package-Version>${project.version}</DT-App-Package-Version>
- <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name>
- <DT-App-Package-Description>${project.description}</DT-App-Package-Description>
- </manifestEntries>
- </archive>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <artifactId>maven-antrun-plugin</artifactId>
- <version>1.7</version>
- <executions>
- <execution>
- <phase>package</phase>
- <configuration>
- <target>
- <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar"
- tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" />
- </target>
- </configuration>
- <goals>
- <goal>run</goal>
- </goals>
- </execution>
- <execution>
- <!-- create resource directory for xml javadoc-->
- <id>createJavadocDirectory</id>
- <phase>generate-resources</phase>
- <configuration>
- <tasks>
- <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/>
- <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/>
- </tasks>
- </configuration>
- <goals>
- <goal>run</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.9.1</version>
- <executions>
- <execution>
- <id>attach-artifacts</id>
- <phase>package</phase>
- <goals>
- <goal>attach-artifact</goal>
- </goals>
- <configuration>
- <artifacts>
- <artifact>
- <file>target/${project.artifactId}-${project.version}.apa</file>
- <type>apa</type>
- </artifact>
- </artifacts>
- <skipAttach>false</skipAttach>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <!-- generate javdoc -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <executions>
- <!-- generate xml javadoc -->
- <execution>
- <id>xml-doclet</id>
- <phase>generate-resources</phase>
- <goals>
- <goal>javadoc</goal>
- </goals>
- <configuration>
- <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet>
- <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam>
- <useStandardDocletOptions>false</useStandardDocletOptions>
- <docletArtifact>
- <groupId>com.github.markusbernhardt</groupId>
- <artifactId>xml-doclet</artifactId>
- <version>1.0.4</version>
- </docletArtifact>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags-->
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>xml-maven-plugin</artifactId>
- <version>1.0</version>
- <executions>
- <execution>
- <id>transform-xmljavadoc</id>
- <phase>generate-resources</phase>
- <goals>
- <goal>transform</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <transformationSets>
- <transformationSet>
- <dir>${project.build.directory}/generated-resources/xml-javadoc</dir>
- <includes>
- <include>${project.artifactId}-${project.version}-javadoc.xml</include>
- </includes>
- <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet>
- <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir>
- </transformationSet>
- </transformationSets>
- </configuration>
- </plugin>
- <!-- copy xml javadoc to class jar -->
- <plugin>
- <artifactId>maven-resources-plugin</artifactId>
- <version>2.6</version>
- <executions>
- <execution>
- <id>copy-resources</id>
- <phase>process-resources</phase>
- <goals>
- <goal>copy-resources</goal>
- </goals>
- <configuration>
- <outputDirectory>${basedir}/target/classes</outputDirectory>
- <resources>
- <resource>
- <directory>${project.build.directory}/generated-resources/xml-javadoc</directory>
- <includes>
- <include>${project.artifactId}-${project.version}-javadoc.xml</include>
- </includes>
- <filtering>true</filtering>
- </resource>
- </resources>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- </plugins>
-
- </build>
-
<dependencies>
- <!-- add your dependencies here -->
<dependency>
<groupId>org.apache.apex</groupId>
- <artifactId>malhar-library</artifactId>
- <version>${malhar.version}</version>
- <!--
- If you know that your application does not need transitive dependencies pulled in by malhar-library,
- uncomment the following to reduce the size of your app package.
- -->
- <!--
+ <artifactId>malhar-contrib</artifactId>
+ <version>${project.version}</version>
<exclusions>
- <exclusion>
- <groupId>*</groupId>
- <artifactId>*</artifactId>
- </exclusion>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
</exclusions>
- -->
</dependency>
<dependency>
- <groupId>org.apache.apex</groupId>
- <artifactId>malhar-contrib</artifactId>
- <version>${malhar.version}</version>
+ <groupId>org.codehaus.jettison</groupId>
+ <artifactId>jettison</artifactId>
+ <version>1.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
- <version>2.5.4</version>
+ <version>2.7.0</version>
</dependency>
<dependency>
<groupId>com.github.fge</groupId>
@@ -269,29 +64,9 @@
<optional>true</optional>
</dependency>
<dependency>
- <groupId>org.apache.apex</groupId>
- <artifactId>apex-common</artifactId>
- <version>${apex.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.10</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.apex</groupId>
- <artifactId>apex-engine</artifactId>
- <version>${apex.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
<groupId>net.sf.supercsv</groupId>
<artifactId>super-csv</artifactId>
<version>2.4.0</version>
</dependency>
</dependencies>
-
</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/main/java/com/demo/myapexapp/Application.java
----------------------------------------------------------------------
diff --git a/examples/csvformatter/src/main/java/com/demo/myapexapp/Application.java b/examples/csvformatter/src/main/java/com/demo/myapexapp/Application.java
deleted file mode 100644
index a4ff06f..0000000
--- a/examples/csvformatter/src/main/java/com/demo/myapexapp/Application.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package com.demo.myapexapp;
-
-import java.util.Arrays;
-
-import com.datatorrent.contrib.parser.JsonParser;
-
-import org.apache.apex.malhar.contrib.parser.StreamingJsonParser;
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.Context.PortContext;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StatsListener;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.contrib.formatter.CsvFormatter;
-import com.datatorrent.lib.appdata.schemas.SchemaUtils;
-import com.datatorrent.lib.io.fs.AbstractFileInputOperator.DirectoryScanner;
-import com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner;
-
-@ApplicationAnnotation(name = "CustomOutputFormatter")
-public class Application implements StreamingApplication
-{
- //Set the delimiters and schema structure for the custom output in schema.json
- private static final String filename = "schema.json";
-
- @Override
- public void populateDAG(DAG dag, Configuration conf)
- {
- JsonGenerator generator = dag.addOperator("JsonGenerator", JsonGenerator.class);
- JsonParser jsonParser = dag.addOperator("jsonParser", JsonParser.class);
-
- CsvFormatter formatter = dag.addOperator("formatter", CsvFormatter.class);
- formatter.setSchema(SchemaUtils.jarResourceFileToString(filename));
- dag.setInputPortAttribute(formatter.in, PortContext.TUPLE_CLASS, PojoEvent.class);
-
- HDFSOutputOperator<String> hdfsOutput = dag.addOperator("HDFSOutputOperator", HDFSOutputOperator.class);
- hdfsOutput.setLineDelimiter("");
-
- dag.addStream("parserStream", generator.out, jsonParser.in);
- dag.addStream("formatterStream", jsonParser.out, formatter.in);
- dag.addStream("outputStream", formatter.out, hdfsOutput.input);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/main/java/com/demo/myapexapp/HDFSOutputOperator.java
----------------------------------------------------------------------
diff --git a/examples/csvformatter/src/main/java/com/demo/myapexapp/HDFSOutputOperator.java b/examples/csvformatter/src/main/java/com/demo/myapexapp/HDFSOutputOperator.java
deleted file mode 100644
index 5cb162c..0000000
--- a/examples/csvformatter/src/main/java/com/demo/myapexapp/HDFSOutputOperator.java
+++ /dev/null
@@ -1,87 +0,0 @@
-package com.demo.myapexapp;
-
-import javax.validation.constraints.NotNull;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
-
-/**
- * HDFSoutput operator with implementation to write Objects to HDFS
- *
- * @param <T>
- */
-public class HDFSOutputOperator<T> extends AbstractFileOutputOperator<T>
-{
-
- @NotNull
- String outFileName;
-
- //setting default value
- String lineDelimiter = "\n";
-
- //Switch to write the files to HDFS - set to false to diable writes
- private boolean writeFilesFlag = true;
-
- int id;
-
- @Override
- public void setup(OperatorContext context)
- {
- super.setup(context);
- id = context.getId();
- }
-
- public boolean isWriteFilesFlag()
- {
- return writeFilesFlag;
- }
-
- public void setWriteFilesFlag(boolean writeFilesFlag)
- {
- this.writeFilesFlag = writeFilesFlag;
- }
-
- public String getOutFileName()
- {
- return outFileName;
- }
-
- public void setOutFileName(String outFileName)
- {
- this.outFileName = outFileName;
- }
-
- @Override
- protected String getFileName(T tuple)
- {
- return getOutFileName() + id;
- }
-
- public String getLineDelimiter()
- {
- return lineDelimiter;
- }
-
- public void setLineDelimiter(String lineDelimiter)
- {
- this.lineDelimiter = lineDelimiter;
- }
-
- @Override
- protected byte[] getBytesForTuple(T tuple)
- {
- String temp = tuple.toString().concat(String.valueOf(lineDelimiter));
- byte[] theByteArray = temp.getBytes();
-
- return theByteArray;
- }
-
- @Override
- protected void processTuple(T tuple)
- {
- if (writeFilesFlag) {
- }
- super.processTuple(tuple);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/main/java/com/demo/myapexapp/JsonGenerator.java
----------------------------------------------------------------------
diff --git a/examples/csvformatter/src/main/java/com/demo/myapexapp/JsonGenerator.java b/examples/csvformatter/src/main/java/com/demo/myapexapp/JsonGenerator.java
deleted file mode 100644
index f50f300..0000000
--- a/examples/csvformatter/src/main/java/com/demo/myapexapp/JsonGenerator.java
+++ /dev/null
@@ -1,78 +0,0 @@
-package com.demo.myapexapp;
-
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Random;
-
-import javax.validation.constraints.Min;
-
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.common.util.BaseOperator;
-
-public class JsonGenerator extends BaseOperator implements InputOperator
-{
-
- private static final Logger LOG = LoggerFactory.getLogger(JsonGenerator.class);
-
- @Min(1)
- private int numTuples = 20;
- private transient int count = 0;
-
- public static Random rand = new Random();
- private int sleepTime=5;
-
- public final transient DefaultOutputPort<byte[]> out = new DefaultOutputPort<byte[]>();
-
- private static String getJson()
- {
-
- JSONObject obj = new JSONObject();
- try {
- obj.put("campaignId", 1234);
- obj.put("campaignName", "SimpleCsvFormatterExample");
- obj.put("campaignBudget", 10000.0);
- obj.put("weatherTargeting", "false");
- obj.put("securityCode", "APEX");
- } catch (JSONException e) {
- return null;
- }
- return obj.toString();
- }
-
- @Override
- public void beginWindow(long windowId)
- {
- count = 0;
- }
-
- @Override
- public void emitTuples()
- {
- if (count++ < numTuples) {
- out.emit(getJson().getBytes());
- } else {
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- LOG.info("Sleep interrupted");
- }
- }
- }
-
- public int getNumTuples()
- {
- return numTuples;
- }
-
- public void setNumTuples(int numTuples)
- {
- this.numTuples = numTuples;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/main/java/com/demo/myapexapp/PojoEvent.java
----------------------------------------------------------------------
diff --git a/examples/csvformatter/src/main/java/com/demo/myapexapp/PojoEvent.java b/examples/csvformatter/src/main/java/com/demo/myapexapp/PojoEvent.java
deleted file mode 100644
index 8514856..0000000
--- a/examples/csvformatter/src/main/java/com/demo/myapexapp/PojoEvent.java
+++ /dev/null
@@ -1,141 +0,0 @@
-package com.demo.myapexapp;
-
-import java.util.Date;
-
-public class PojoEvent
-{
-
- private int advId;
- private int campaignId;
- private String campaignName;
- private double campaignBudget;
- private Date startDate;
- private Date endDate;
- private String securityCode;
- private boolean weatherTargeting;
- private boolean optimized;
- private String parentCampaign;
- private Character weatherTargeted;
- private String valid;
-
- public int getAdvId()
- {
- return advId;
- }
-
- public void setAdvId(int AdId)
- {
- this.advId = advId;
- }
-
- public int getCampaignId()
- {
- return campaignId;
- }
-
- public void setCampaignId(int campaignId)
- {
- this.campaignId = campaignId;
- }
-
- public String getCampaignName()
- {
- return campaignName;
- }
-
- public void setCampaignName(String campaignName)
- {
- this.campaignName = campaignName;
- }
-
- public double getCampaignBudget()
- {
- return campaignBudget;
- }
-
- public void setCampaignBudget(double campaignBudget)
- {
- this.campaignBudget = campaignBudget;
- }
-
- public Date getStartDate()
- {
- return startDate;
- }
-
- public void setStartDate(Date startDate)
- {
- this.startDate = startDate;
- }
-
- public Date getEndDate()
- {
- return endDate;
- }
-
- public void setEndDate(Date endDate)
- {
- this.endDate = endDate;
- }
-
- public String getSecurityCode()
- {
- return securityCode;
- }
-
- public void setSecurityCode(String securityCode)
- {
- this.securityCode = securityCode;
- }
-
- public boolean isWeatherTargeting()
- {
- return weatherTargeting;
- }
-
- public void setWeatherTargeting(boolean weatherTargeting)
- {
- this.weatherTargeting = weatherTargeting;
- }
-
- public boolean isOptimized()
- {
- return optimized;
- }
-
- public void setOptimized(boolean optimized)
- {
- this.optimized = optimized;
- }
-
- public String getParentCampaign()
- {
- return parentCampaign;
- }
-
- public void setParentCampaign(String parentCampaign)
- {
- this.parentCampaign = parentCampaign;
- }
-
- public Character getWeatherTargeted()
- {
- return weatherTargeted;
- }
-
- public void setWeatherTargeted(Character weatherTargeted)
- {
- this.weatherTargeted = weatherTargeted;
- }
-
- public String getValid()
- {
- return valid;
- }
-
- public void setValid(String valid)
- {
- this.valid = valid;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/Application.java
----------------------------------------------------------------------
diff --git a/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/Application.java b/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/Application.java
new file mode 100644
index 0000000..cc9ee79
--- /dev/null
+++ b/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/Application.java
@@ -0,0 +1,37 @@
+package org.apache.apex.examples.csvformatter;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.formatter.CsvFormatter;
+import com.datatorrent.contrib.parser.JsonParser;
+import com.datatorrent.lib.appdata.schemas.SchemaUtils;
+
+@ApplicationAnnotation(name = "CustomOutputFormatter")
+public class Application implements StreamingApplication
+{
+ //Set the delimiters and schema structure for the custom output in schema.json
+ private static final String filename = "schema.json";
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ JsonGenerator generator = dag.addOperator("JsonGenerator", JsonGenerator.class);
+ JsonParser jsonParser = dag.addOperator("jsonParser", JsonParser.class);
+
+ CsvFormatter formatter = dag.addOperator("formatter", CsvFormatter.class);
+ formatter.setSchema(SchemaUtils.jarResourceFileToString(filename));
+ dag.setInputPortAttribute(formatter.in, PortContext.TUPLE_CLASS, PojoEvent.class);
+
+ HDFSOutputOperator<String> hdfsOutput = dag.addOperator("HDFSOutputOperator", HDFSOutputOperator.class);
+ hdfsOutput.setLineDelimiter("");
+
+ dag.addStream("parserStream", generator.out, jsonParser.in);
+ dag.addStream("formatterStream", jsonParser.out, formatter.in);
+ dag.addStream("outputStream", formatter.out, hdfsOutput.input);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/HDFSOutputOperator.java
----------------------------------------------------------------------
diff --git a/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/HDFSOutputOperator.java b/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/HDFSOutputOperator.java
new file mode 100644
index 0000000..7cdd8bb
--- /dev/null
+++ b/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/HDFSOutputOperator.java
@@ -0,0 +1,87 @@
+package org.apache.apex.examples.csvformatter;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
+
+/**
+ * HDFSoutput operator with implementation to write Objects to HDFS
+ *
+ * @param <T>
+ */
+public class HDFSOutputOperator<T> extends AbstractFileOutputOperator<T>
+{
+
+ @NotNull
+ String outFileName;
+
+ //setting default value
+ String lineDelimiter = "\n";
+
+ //Switch to write the files to HDFS - set to false to diable writes
+ private boolean writeFilesFlag = true;
+
+ int id;
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ super.setup(context);
+ id = context.getId();
+ }
+
+ public boolean isWriteFilesFlag()
+ {
+ return writeFilesFlag;
+ }
+
+ public void setWriteFilesFlag(boolean writeFilesFlag)
+ {
+ this.writeFilesFlag = writeFilesFlag;
+ }
+
+ public String getOutFileName()
+ {
+ return outFileName;
+ }
+
+ public void setOutFileName(String outFileName)
+ {
+ this.outFileName = outFileName;
+ }
+
+ @Override
+ protected String getFileName(T tuple)
+ {
+ return getOutFileName() + id;
+ }
+
+ public String getLineDelimiter()
+ {
+ return lineDelimiter;
+ }
+
+ public void setLineDelimiter(String lineDelimiter)
+ {
+ this.lineDelimiter = lineDelimiter;
+ }
+
+ @Override
+ protected byte[] getBytesForTuple(T tuple)
+ {
+ String temp = tuple.toString().concat(String.valueOf(lineDelimiter));
+ byte[] theByteArray = temp.getBytes();
+
+ return theByteArray;
+ }
+
+ @Override
+ protected void processTuple(T tuple)
+ {
+ if (writeFilesFlag) {
+ }
+ super.processTuple(tuple);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/JsonGenerator.java
----------------------------------------------------------------------
diff --git a/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/JsonGenerator.java b/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/JsonGenerator.java
new file mode 100644
index 0000000..9b7698c
--- /dev/null
+++ b/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/JsonGenerator.java
@@ -0,0 +1,76 @@
+package org.apache.apex.examples.csvformatter;
+
+import java.util.Random;
+
+import javax.validation.constraints.Min;
+
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.common.util.BaseOperator;
+
+public class JsonGenerator extends BaseOperator implements InputOperator
+{
+
+ private static final Logger LOG = LoggerFactory.getLogger(JsonGenerator.class);
+
+ @Min(1)
+ private int numTuples = 20;
+ private transient int count = 0;
+
+ public static Random rand = new Random();
+ private int sleepTime=5;
+
+ public final transient DefaultOutputPort<byte[]> out = new DefaultOutputPort<byte[]>();
+
+ private static String getJson()
+ {
+
+ JSONObject obj = new JSONObject();
+ try {
+ obj.put("campaignId", 1234);
+ obj.put("campaignName", "SimpleCsvFormatterExample");
+ obj.put("campaignBudget", 10000.0);
+ obj.put("weatherTargeting", "false");
+ obj.put("securityCode", "APEX");
+ } catch (JSONException e) {
+ return null;
+ }
+ return obj.toString();
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ count = 0;
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ if (count++ < numTuples) {
+ out.emit(getJson().getBytes());
+ } else {
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ LOG.info("Sleep interrupted");
+ }
+ }
+ }
+
+ public int getNumTuples()
+ {
+ return numTuples;
+ }
+
+ public void setNumTuples(int numTuples)
+ {
+ this.numTuples = numTuples;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/PojoEvent.java
----------------------------------------------------------------------
diff --git a/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/PojoEvent.java b/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/PojoEvent.java
new file mode 100644
index 0000000..03fda93
--- /dev/null
+++ b/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/PojoEvent.java
@@ -0,0 +1,141 @@
+package org.apache.apex.examples.csvformatter;
+
+import java.util.Date;
+
+public class PojoEvent
+{
+
+ private int advId;
+ private int campaignId;
+ private String campaignName;
+ private double campaignBudget;
+ private Date startDate;
+ private Date endDate;
+ private String securityCode;
+ private boolean weatherTargeting;
+ private boolean optimized;
+ private String parentCampaign;
+ private Character weatherTargeted;
+ private String valid;
+
+ public int getAdvId()
+ {
+ return advId;
+ }
+
+ public void setAdvId(int AdId)
+ {
+ this.advId = advId;
+ }
+
+ public int getCampaignId()
+ {
+ return campaignId;
+ }
+
+ public void setCampaignId(int campaignId)
+ {
+ this.campaignId = campaignId;
+ }
+
+ public String getCampaignName()
+ {
+ return campaignName;
+ }
+
+ public void setCampaignName(String campaignName)
+ {
+ this.campaignName = campaignName;
+ }
+
+ public double getCampaignBudget()
+ {
+ return campaignBudget;
+ }
+
+ public void setCampaignBudget(double campaignBudget)
+ {
+ this.campaignBudget = campaignBudget;
+ }
+
+ public Date getStartDate()
+ {
+ return startDate;
+ }
+
+ public void setStartDate(Date startDate)
+ {
+ this.startDate = startDate;
+ }
+
+ public Date getEndDate()
+ {
+ return endDate;
+ }
+
+ public void setEndDate(Date endDate)
+ {
+ this.endDate = endDate;
+ }
+
+ public String getSecurityCode()
+ {
+ return securityCode;
+ }
+
+ public void setSecurityCode(String securityCode)
+ {
+ this.securityCode = securityCode;
+ }
+
+ public boolean isWeatherTargeting()
+ {
+ return weatherTargeting;
+ }
+
+ public void setWeatherTargeting(boolean weatherTargeting)
+ {
+ this.weatherTargeting = weatherTargeting;
+ }
+
+ public boolean isOptimized()
+ {
+ return optimized;
+ }
+
+ public void setOptimized(boolean optimized)
+ {
+ this.optimized = optimized;
+ }
+
+ public String getParentCampaign()
+ {
+ return parentCampaign;
+ }
+
+ public void setParentCampaign(String parentCampaign)
+ {
+ this.parentCampaign = parentCampaign;
+ }
+
+ public Character getWeatherTargeted()
+ {
+ return weatherTargeted;
+ }
+
+ public void setWeatherTargeted(Character weatherTargeted)
+ {
+ this.weatherTargeted = weatherTargeted;
+ }
+
+ public String getValid()
+ {
+ return valid;
+ }
+
+ public void setValid(String valid)
+ {
+ this.valid = valid;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/csvformatter/src/main/resources/META-INF/properties.xml b/examples/csvformatter/src/main/resources/META-INF/properties.xml
index ed2b5ce..8d67c93 100644
--- a/examples/csvformatter/src/main/resources/META-INF/properties.xml
+++ b/examples/csvformatter/src/main/resources/META-INF/properties.xml
@@ -20,7 +20,7 @@
<property>
<name>dt.application.CustomOutputFormatter.operator.jsonParser.port.out.attr.TUPLE_CLASS
</name>
- <value>com.demo.myapexapp.PojoEvent</value>
+ <value>org.apache.apex.examples.csvformatter.PojoEvent</value>
</property>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/test/java/com/demo/myapexapp/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/csvformatter/src/test/java/com/demo/myapexapp/ApplicationTest.java b/examples/csvformatter/src/test/java/com/demo/myapexapp/ApplicationTest.java
deleted file mode 100644
index efe5946..0000000
--- a/examples/csvformatter/src/test/java/com/demo/myapexapp/ApplicationTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package com.demo.myapexapp;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-
-import javax.validation.ConstraintViolationException;
-
-import org.apache.commons.io.FileUtils;
-
-import org.junit.AfterClass;
-import org.junit.Assert;
-
-import org.apache.hadoop.conf.Configuration;
-
-import org.junit.Test;
-
-import com.datatorrent.api.LocalMode;
-
-/**
- * Test the DAG declaration in local mode.
- */
-public class ApplicationTest
-{
-
- private static final String FILE_NAME = "/tmp/formatterApp";
-
- @AfterClass
- public static void cleanup()
- {
- try {
- FileUtils.deleteDirectory(new File(FILE_NAME));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Test
- public void testApplication() throws Exception
- {
- try {
- LocalMode lma = LocalMode.newInstance();
- Configuration conf = new Configuration(false);
- conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
- lma.prepareDAG(new Application(), conf);
- LocalMode.Controller lc = lma.getController();
- lc.runAsync();
-
- // wait for output files to roll
- Thread.sleep(5000);
-
- String[] extensions = {"dat.0", "tmp"};
- Collection<File> list = FileUtils.listFiles(new File(FILE_NAME), extensions, false);
-
- for (File file : list) {
- for (String line : FileUtils.readLines(file)) {
- Assert.assertEquals("Delimiter in record", true, (line.equals(
- "1234|0|SimpleCsvFormatterExample|10000.0|||APEX|false|false||")));
- }
- }
-
- } catch (ConstraintViolationException e) {
- Assert.fail("constraint violations: " + e.getConstraintViolations());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/test/java/org/apache/apex/examples/csvformatter/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/csvformatter/src/test/java/org/apache/apex/examples/csvformatter/ApplicationTest.java b/examples/csvformatter/src/test/java/org/apache/apex/examples/csvformatter/ApplicationTest.java
new file mode 100644
index 0000000..67d5fd0
--- /dev/null
+++ b/examples/csvformatter/src/test/java/org/apache/apex/examples/csvformatter/ApplicationTest.java
@@ -0,0 +1,65 @@
+package org.apache.apex.examples.csvformatter;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+/**
+ * Test the DAG declaration in local mode.
+ */
+public class ApplicationTest
+{
+
+ private static final String FILE_NAME = "/tmp/formatterApp";
+
+ @AfterClass
+ public static void cleanup()
+ {
+ try {
+ FileUtils.deleteDirectory(new File(FILE_NAME));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testApplication() throws Exception
+ {
+ try {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+ lma.prepareDAG(new Application(), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.runAsync();
+
+ // wait for output files to roll
+ Thread.sleep(5000);
+
+ String[] extensions = {"dat.0", "tmp"};
+ Collection<File> list = FileUtils.listFiles(new File(FILE_NAME), extensions, false);
+
+ for (File file : list) {
+ for (String line : FileUtils.readLines(file)) {
+ Assert.assertEquals("Delimiter in record", true, (line.equals(
+ "1234|0|SimpleCsvFormatterExample|10000.0|||APEX|false|false||")));
+ }
+ }
+
+ } catch (ConstraintViolationException e) {
+ Assert.fail("constraint violations: " + e.getConstraintViolations());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dedup/pom.xml
----------------------------------------------------------------------
diff --git a/examples/dedup/pom.xml b/examples/dedup/pom.xml
index f777784..ba5a24d 100644
--- a/examples/dedup/pom.xml
+++ b/examples/dedup/pom.xml
@@ -2,279 +2,30 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
- <groupId>com.example</groupId>
- <version>1.0-SNAPSHOT</version>
- <artifactId>dedup</artifactId>
+ <parent>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-examples</artifactId>
+ <version>3.7.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>malhar-examples-dedup</artifactId>
<packaging>jar</packaging>
<!-- change these to the appropriate values -->
- <name>My Apex Application</name>
- <description>My Apex Application Description</description>
-
- <properties>
- <!-- change this if you desire to use a different version of Apex Core -->
- <apex.version>3.5.0</apex.version>
- <malhar.version>3.6.0</malhar.version>
- <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
- </properties>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-eclipse-plugin</artifactId>
- <version>2.9</version>
- <configuration>
- <downloadSources>true</downloadSources>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.3</version>
- <configuration>
- <encoding>UTF-8</encoding>
- <source>1.7</source>
- <target>1.7</target>
- <debug>true</debug>
- <optimize>false</optimize>
- <showDeprecation>true</showDeprecation>
- <showWarnings>true</showWarnings>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-dependency-plugin</artifactId>
- <version>2.8</version>
- <executions>
- <execution>
- <id>copy-dependencies</id>
- <phase>prepare-package</phase>
- <goals>
- <goal>copy-dependencies</goal>
- </goals>
- <configuration>
- <outputDirectory>target/deps</outputDirectory>
- <includeScope>runtime</includeScope>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <executions>
- <execution>
- <id>app-package-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- <configuration>
- <finalName>${project.artifactId}-${project.version}-apexapp</finalName>
- <appendAssemblyId>false</appendAssemblyId>
- <descriptors>
- <descriptor>src/assemble/appPackage.xml</descriptor>
- </descriptors>
- <archiverConfig>
- <defaultDirectoryMode>0755</defaultDirectoryMode>
- </archiverConfig>
- <archive>
- <manifestEntries>
- <Class-Path>${apex.apppackage.classpath}</Class-Path>
- <DT-Engine-Version>${apex.version}</DT-Engine-Version>
- <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id>
- <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name>
- <DT-App-Package-Version>${project.version}</DT-App-Package-Version>
- <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name>
- <DT-App-Package-Description>${project.description}</DT-App-Package-Description>
- </manifestEntries>
- </archive>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <artifactId>maven-antrun-plugin</artifactId>
- <version>1.7</version>
- <executions>
- <execution>
- <phase>package</phase>
- <configuration>
- <target>
- <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar"
- tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" />
- </target>
- </configuration>
- <goals>
- <goal>run</goal>
- </goals>
- </execution>
- <execution>
- <!-- create resource directory for xml javadoc-->
- <id>createJavadocDirectory</id>
- <phase>generate-resources</phase>
- <configuration>
- <tasks>
- <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/>
- <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/>
- </tasks>
- </configuration>
- <goals>
- <goal>run</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.9.1</version>
- <executions>
- <execution>
- <id>attach-artifacts</id>
- <phase>package</phase>
- <goals>
- <goal>attach-artifact</goal>
- </goals>
- <configuration>
- <artifacts>
- <artifact>
- <file>target/${project.artifactId}-${project.version}.apa</file>
- <type>apa</type>
- </artifact>
- </artifacts>
- <skipAttach>false</skipAttach>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <!-- generate javdoc -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <executions>
- <!-- generate xml javadoc -->
- <execution>
- <id>xml-doclet</id>
- <phase>generate-resources</phase>
- <goals>
- <goal>javadoc</goal>
- </goals>
- <configuration>
- <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet>
- <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam>
- <useStandardDocletOptions>false</useStandardDocletOptions>
- <docletArtifact>
- <groupId>com.github.markusbernhardt</groupId>
- <artifactId>xml-doclet</artifactId>
- <version>1.0.4</version>
- </docletArtifact>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags-->
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>xml-maven-plugin</artifactId>
- <version>1.0</version>
- <executions>
- <execution>
- <id>transform-xmljavadoc</id>
- <phase>generate-resources</phase>
- <goals>
- <goal>transform</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <transformationSets>
- <transformationSet>
- <dir>${project.build.directory}/generated-resources/xml-javadoc</dir>
- <includes>
- <include>${project.artifactId}-${project.version}-javadoc.xml</include>
- </includes>
- <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet>
- <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir>
- </transformationSet>
- </transformationSets>
- </configuration>
- </plugin>
- <!-- copy xml javadoc to class jar -->
- <plugin>
- <artifactId>maven-resources-plugin</artifactId>
- <version>2.6</version>
- <executions>
- <execution>
- <id>copy-resources</id>
- <phase>process-resources</phase>
- <goals>
- <goal>copy-resources</goal>
- </goals>
- <configuration>
- <outputDirectory>${basedir}/target/classes</outputDirectory>
- <resources>
- <resource>
- <directory>${project.build.directory}/generated-resources/xml-javadoc</directory>
- <includes>
- <include>${project.artifactId}-${project.version}-javadoc.xml</include>
- </includes>
- <filtering>true</filtering>
- </resource>
- </resources>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- </plugins>
-
- </build>
+ <name>Dedup Application</name>
+ <description>Dedup Application</description>
<dependencies>
- <!-- add your dependencies here -->
- <dependency>
- <groupId>org.apache.apex</groupId>
- <artifactId>malhar-library</artifactId>
- <version>${malhar.version}</version>
- <!--
- If you know that your application does not need transitive dependencies pulled in by malhar-library,
- uncomment the following to reduce the size of your app package.
- -->
- <!--
- <exclusions>
- <exclusion>
- <groupId>*</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
- -->
- </dependency>
- <dependency>
- <groupId>org.apache.apex</groupId>
- <artifactId>apex-common</artifactId>
- <version>${apex.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.10</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.apex</groupId>
- <artifactId>apex-engine</artifactId>
- <version>${apex.version}</version>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>2.7.8</version>
</dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>2.9.1</version>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dedup/src/main/java/com/example/dedup/Application.java
----------------------------------------------------------------------
diff --git a/examples/dedup/src/main/java/com/example/dedup/Application.java b/examples/dedup/src/main/java/com/example/dedup/Application.java
deleted file mode 100644
index cabdce2..0000000
--- a/examples/dedup/src/main/java/com/example/dedup/Application.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Put your copyright and license info here.
- */
-package com.example.dedup;
-
-import java.util.Date;
-import java.util.Random;
-
-import org.apache.apex.malhar.lib.dedup.TimeBasedDedupOperator;
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.common.partitioner.StatelessPartitioner;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-
-@ApplicationAnnotation(name="DedupExample")
-public class Application implements StreamingApplication
-{
-
- @Override
- public void populateDAG(DAG dag, Configuration conf)
- {
- // Test Data Generator Operator
- RandomDataGeneratorOperator gen = dag.addOperator("RandomGenerator", new RandomDataGeneratorOperator());
-
- // Dedup Operator. Configuration through resources/META-INF/properties.xml
- TimeBasedDedupOperator dedup = dag.addOperator("Deduper", new TimeBasedDedupOperator());
-
- // Console output operator for unique tuples
- ConsoleOutputOperator consoleUnique = dag.addOperator("ConsoleUnique", new ConsoleOutputOperator());
-
- // Console output operator for duplicate tuples
- ConsoleOutputOperator consoleDuplicate = dag.addOperator("ConsoleDuplicate", new ConsoleOutputOperator());
-
- // Console output operator for duplicate tuples
- ConsoleOutputOperator consoleExpired = dag.addOperator("ConsoleExpired", new ConsoleOutputOperator());
-
- // Streams
- dag.addStream("Generator to Dedup", gen.output, dedup.input);
-
- // Connect Dedup unique to Console
- dag.addStream("Dedup Unique to Console", dedup.unique, consoleUnique.input);
- // Connect Dedup duplicate to Console
- dag.addStream("Dedup Duplicate to Console", dedup.duplicate, consoleDuplicate.input);
- // Connect Dedup expired to Console
- dag.addStream("Dedup Expired to Console", dedup.expired, consoleExpired.input);
-
- // Set Attribute TUPLE_CLASS for supplying schema information to the port
- dag.setInputPortAttribute(dedup.input, Context.PortContext.TUPLE_CLASS, TestEvent.class);
-
- // Uncomment the following line to create multiple partitions for Dedup operator. In this case: 2
- // dag.setAttribute(dedup, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<TimeBasedDedupOperator>(2));
- }
-
- public static class RandomDataGeneratorOperator extends BaseOperator implements InputOperator
- {
-
- public final transient DefaultOutputPort<TestEvent> output = new DefaultOutputPort<>();
- private final transient Random r = new Random();
- private int tuplesPerWindow = 100;
- private transient int count = 0;
-
- @Override
- public void beginWindow(long windowId) {
- count = 0;
- }
-
- @Override
- public void emitTuples()
- {
- if (count++ > tuplesPerWindow) {
- return;
- }
- TestEvent event = new TestEvent();
- event.id = r.nextInt(100);
- event.eventTime = new Date(System.currentTimeMillis() - (r.nextInt(60 * 1000)));
- output.emit(event);
- }
- }
-
- public static class TestEvent
- {
- private int id;
- private Date eventTime;
-
- public TestEvent()
- {
- }
-
- public int getId()
- {
- return id;
- }
-
- public void setId(int id)
- {
- this.id = id;
- }
-
- public Date getEventTime()
- {
- return eventTime;
- }
-
- public void setEventTime(Date eventTime)
- {
- this.eventTime = eventTime;
- }
-
- @Override
- public String toString() {
- return "TestEvent [id=" + id + ", eventTime=" + eventTime + "]";
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dedup/src/main/java/org/apache/apex/examples/dedup/Application.java
----------------------------------------------------------------------
diff --git a/examples/dedup/src/main/java/org/apache/apex/examples/dedup/Application.java b/examples/dedup/src/main/java/org/apache/apex/examples/dedup/Application.java
new file mode 100644
index 0000000..2498d62
--- /dev/null
+++ b/examples/dedup/src/main/java/org/apache/apex/examples/dedup/Application.java
@@ -0,0 +1,122 @@
+/**
+ * Put your copyright and license info here.
+ */
+package org.apache.apex.examples.dedup;
+
+import java.util.Date;
+import java.util.Random;
+
+import org.apache.apex.malhar.lib.dedup.TimeBasedDedupOperator;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+
+@ApplicationAnnotation(name="DedupExample")
+public class Application implements StreamingApplication
+{
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ // Test Data Generator Operator
+ RandomDataGeneratorOperator gen = dag.addOperator("RandomGenerator", new RandomDataGeneratorOperator());
+
+ // Dedup Operator. Configuration through resources/META-INF/properties.xml
+ TimeBasedDedupOperator dedup = dag.addOperator("Deduper", new TimeBasedDedupOperator());
+
+ // Console output operator for unique tuples
+ ConsoleOutputOperator consoleUnique = dag.addOperator("ConsoleUnique", new ConsoleOutputOperator());
+
+ // Console output operator for duplicate tuples
+ ConsoleOutputOperator consoleDuplicate = dag.addOperator("ConsoleDuplicate", new ConsoleOutputOperator());
+
+ // Console output operator for duplicate tuples
+ ConsoleOutputOperator consoleExpired = dag.addOperator("ConsoleExpired", new ConsoleOutputOperator());
+
+ // Streams
+ dag.addStream("Generator to Dedup", gen.output, dedup.input);
+
+ // Connect Dedup unique to Console
+ dag.addStream("Dedup Unique to Console", dedup.unique, consoleUnique.input);
+ // Connect Dedup duplicate to Console
+ dag.addStream("Dedup Duplicate to Console", dedup.duplicate, consoleDuplicate.input);
+ // Connect Dedup expired to Console
+ dag.addStream("Dedup Expired to Console", dedup.expired, consoleExpired.input);
+
+ // Set Attribute TUPLE_CLASS for supplying schema information to the port
+ dag.setInputPortAttribute(dedup.input, Context.PortContext.TUPLE_CLASS, TestEvent.class);
+
+ // Uncomment the following line to create multiple partitions for Dedup operator. In this case: 2
+ // dag.setAttribute(dedup, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<TimeBasedDedupOperator>(2));
+ }
+
+ public static class RandomDataGeneratorOperator extends BaseOperator implements InputOperator
+ {
+
+ public final transient DefaultOutputPort<TestEvent> output = new DefaultOutputPort<>();
+ private final transient Random r = new Random();
+ private int tuplesPerWindow = 100;
+ private transient int count = 0;
+
+ @Override
+ public void beginWindow(long windowId) {
+ count = 0;
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ if (count++ > tuplesPerWindow) {
+ return;
+ }
+ TestEvent event = new TestEvent();
+ event.id = r.nextInt(100);
+ event.eventTime = new Date(System.currentTimeMillis() - (r.nextInt(60 * 1000)));
+ output.emit(event);
+ }
+ }
+
+ public static class TestEvent
+ {
+ private int id;
+ private Date eventTime;
+
+ public TestEvent()
+ {
+ }
+
+ public int getId()
+ {
+ return id;
+ }
+
+ public void setId(int id)
+ {
+ this.id = id;
+ }
+
+ public Date getEventTime()
+ {
+ return eventTime;
+ }
+
+ public void setEventTime(Date eventTime)
+ {
+ this.eventTime = eventTime;
+ }
+
+ @Override
+ public String toString() {
+ return "TestEvent [id=" + id + ", eventTime=" + eventTime + "]";
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dedup/src/test/java/com/example/dedup/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/dedup/src/test/java/com/example/dedup/ApplicationTest.java b/examples/dedup/src/test/java/com/example/dedup/ApplicationTest.java
deleted file mode 100644
index 9c9f17c..0000000
--- a/examples/dedup/src/test/java/com/example/dedup/ApplicationTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Put your copyright and license info here.
- */
-package com.example.dedup;
-
-import java.io.IOException;
-
-import javax.validation.ConstraintViolationException;
-
-import org.junit.Assert;
-
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Test;
-
-import com.datatorrent.api.LocalMode;
-import com.example.dedup.Application;
-
-/**
- * Test the DAG declaration in local mode.
- */
-public class ApplicationTest {
-
- @Test
- public void testApplication() throws IOException, Exception {
- try {
- LocalMode lma = LocalMode.newInstance();
- Configuration conf = new Configuration(false);
- conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
- lma.prepareDAG(new Application(), conf);
- LocalMode.Controller lc = lma.getController();
- lc.runAsync();
- Thread.sleep(10 * 1000);
- lc.shutdown();
- } catch (ConstraintViolationException e) {
- Assert.fail("constraint violations: " + e.getConstraintViolations());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dedup/src/test/java/org/apache/apex/examples/dedup/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/dedup/src/test/java/org/apache/apex/examples/dedup/ApplicationTest.java b/examples/dedup/src/test/java/org/apache/apex/examples/dedup/ApplicationTest.java
new file mode 100644
index 0000000..3304a04
--- /dev/null
+++ b/examples/dedup/src/test/java/org/apache/apex/examples/dedup/ApplicationTest.java
@@ -0,0 +1,37 @@
+/**
+ * Put your copyright and license info here.
+ */
+package org.apache.apex.examples.dedup;
+
+import java.io.IOException;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+/**
+ * Test the DAG declaration in local mode.
+ */
+public class ApplicationTest {
+
+ @Test
+ public void testApplication() throws IOException, Exception {
+ try {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+ lma.prepareDAG(new Application(), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.runAsync();
+ Thread.sleep(10 * 1000);
+ lc.shutdown();
+ } catch (ConstraintViolationException e) {
+ Assert.fail("constraint violations: " + e.getConstraintViolations());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dynamic-partition/pom.xml
----------------------------------------------------------------------
diff --git a/examples/dynamic-partition/pom.xml b/examples/dynamic-partition/pom.xml
index 34e91ee..21b1c30 100644
--- a/examples/dynamic-partition/pom.xml
+++ b/examples/dynamic-partition/pom.xml
@@ -1,273 +1,25 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
-
- <groupId>com.example</groupId>
- <version>1.0-SNAPSHOT</version>
- <artifactId>dynamic-partition</artifactId>
+
+ <parent>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-examples</artifactId>
+ <version>3.7.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>malhar-examples-dynamic-partition</artifactId>
<packaging>jar</packaging>
<!-- change these to the appropriate values -->
<name>Dynamic Partitioning</name>
<description>Example showing dynamic partitioning</description>
- <properties>
- <!-- change this if you desire to use a different version of Apex Core -->
- <apex.version>3.5.0</apex.version>
- <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
- </properties>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-eclipse-plugin</artifactId>
- <version>2.9</version>
- <configuration>
- <downloadSources>true</downloadSources>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.3</version>
- <configuration>
- <encoding>UTF-8</encoding>
- <source>1.7</source>
- <target>1.7</target>
- <debug>true</debug>
- <optimize>false</optimize>
- <showDeprecation>true</showDeprecation>
- <showWarnings>true</showWarnings>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-dependency-plugin</artifactId>
- <version>2.8</version>
- <executions>
- <execution>
- <id>copy-dependencies</id>
- <phase>prepare-package</phase>
- <goals>
- <goal>copy-dependencies</goal>
- </goals>
- <configuration>
- <outputDirectory>target/deps</outputDirectory>
- <includeScope>runtime</includeScope>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <executions>
- <execution>
- <id>app-package-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- <configuration>
- <finalName>${project.artifactId}-${project.version}-apexapp</finalName>
- <appendAssemblyId>false</appendAssemblyId>
- <descriptors>
- <descriptor>src/assemble/appPackage.xml</descriptor>
- </descriptors>
- <archiverConfig>
- <defaultDirectoryMode>0755</defaultDirectoryMode>
- </archiverConfig>
- <archive>
- <manifestEntries>
- <Class-Path>${apex.apppackage.classpath}</Class-Path>
- <DT-Engine-Version>${apex.version}</DT-Engine-Version>
- <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id>
- <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name>
- <DT-App-Package-Version>${project.version}</DT-App-Package-Version>
- <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name>
- <DT-App-Package-Description>${project.description}</DT-App-Package-Description>
- </manifestEntries>
- </archive>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <artifactId>maven-antrun-plugin</artifactId>
- <version>1.7</version>
- <executions>
- <execution>
- <phase>package</phase>
- <configuration>
- <target>
- <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar"
- tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" />
- </target>
- </configuration>
- <goals>
- <goal>run</goal>
- </goals>
- </execution>
- <execution>
- <!-- create resource directory for xml javadoc-->
- <id>createJavadocDirectory</id>
- <phase>generate-resources</phase>
- <configuration>
- <tasks>
- <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/>
- <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/>
- </tasks>
- </configuration>
- <goals>
- <goal>run</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.9.1</version>
- <executions>
- <execution>
- <id>attach-artifacts</id>
- <phase>package</phase>
- <goals>
- <goal>attach-artifact</goal>
- </goals>
- <configuration>
- <artifacts>
- <artifact>
- <file>target/${project.artifactId}-${project.version}.apa</file>
- <type>apa</type>
- </artifact>
- </artifacts>
- <skipAttach>false</skipAttach>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <!-- generate javdoc -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <executions>
- <!-- generate xml javadoc -->
- <execution>
- <id>xml-doclet</id>
- <phase>generate-resources</phase>
- <goals>
- <goal>javadoc</goal>
- </goals>
- <configuration>
- <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet>
- <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam>
- <useStandardDocletOptions>false</useStandardDocletOptions>
- <docletArtifact>
- <groupId>com.github.markusbernhardt</groupId>
- <artifactId>xml-doclet</artifactId>
- <version>1.0.4</version>
- </docletArtifact>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags-->
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>xml-maven-plugin</artifactId>
- <version>1.0</version>
- <executions>
- <execution>
- <id>transform-xmljavadoc</id>
- <phase>generate-resources</phase>
- <goals>
- <goal>transform</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <transformationSets>
- <transformationSet>
- <dir>${project.build.directory}/generated-resources/xml-javadoc</dir>
- <includes>
- <include>${project.artifactId}-${project.version}-javadoc.xml</include>
- </includes>
- <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet>
- <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir>
- </transformationSet>
- </transformationSets>
- </configuration>
- </plugin>
- <!-- copy xml javadoc to class jar -->
- <plugin>
- <artifactId>maven-resources-plugin</artifactId>
- <version>2.6</version>
- <executions>
- <execution>
- <id>copy-resources</id>
- <phase>process-resources</phase>
- <goals>
- <goal>copy-resources</goal>
- </goals>
- <configuration>
- <outputDirectory>${basedir}/target/classes</outputDirectory>
- <resources>
- <resource>
- <directory>${project.build.directory}/generated-resources/xml-javadoc</directory>
- <includes>
- <include>${project.artifactId}-${project.version}-javadoc.xml</include>
- </includes>
- <filtering>true</filtering>
- </resource>
- </resources>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- </plugins>
-
- </build>
-
<dependencies>
- <!-- add your dependencies here -->
- <dependency>
- <groupId>org.apache.apex</groupId>
- <artifactId>malhar-library</artifactId>
- <version>3.6.0</version>
- <!--
- If you know that your application does not need transitive dependencies pulled in by malhar-library,
- uncomment the following to reduce the size of your app package.
- -->
- <!--
- <exclusions>
- <exclusion>
- <groupId>*</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
- -->
- </dependency>
- <dependency>
- <groupId>org.apache.apex</groupId>
- <artifactId>apex-common</artifactId>
- <version>${apex.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.10</version>
- <scope>test</scope>
- </dependency>
<dependency>
- <groupId>org.apache.apex</groupId>
- <artifactId>apex-engine</artifactId>
- <version>${apex.version}</version>
- <scope>test</scope>
+ <groupId>com.esotericsoftware.kryo</groupId>
+ <artifactId>kryo</artifactId>
+ <version>2.24.0</version>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dynamic-partition/src/main/java/com/example/dynamic/App.java
----------------------------------------------------------------------
diff --git a/examples/dynamic-partition/src/main/java/com/example/dynamic/App.java b/examples/dynamic-partition/src/main/java/com/example/dynamic/App.java
deleted file mode 100644
index 9eec263..0000000
--- a/examples/dynamic-partition/src/main/java/com/example/dynamic/App.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package com.example.dynamic;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.DAG;
-
-import com.datatorrent.lib.stream.DevNull;
-
-@ApplicationAnnotation(name="Dyn")
-public class App implements StreamingApplication
-{
-
- @Override
- public void populateDAG(DAG dag, Configuration conf)
- {
- Gen gen = dag.addOperator("gen", Gen.class);
- DevNull devNull = dag.addOperator("devNull", DevNull.class);
-
- dag.addStream("data", gen.out, devNull.data);
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dynamic-partition/src/main/java/com/example/dynamic/Gen.java
----------------------------------------------------------------------
diff --git a/examples/dynamic-partition/src/main/java/com/example/dynamic/Gen.java b/examples/dynamic-partition/src/main/java/com/example/dynamic/Gen.java
deleted file mode 100644
index 4cccd23..0000000
--- a/examples/dynamic-partition/src/main/java/com/example/dynamic/Gen.java
+++ /dev/null
@@ -1,169 +0,0 @@
-package com.example.dynamic;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.google.common.collect.Lists;
-import org.apache.commons.io.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.validation.constraints.NotNull;
-import java.io.ByteArrayOutputStream;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-import com.datatorrent.api.DefaultPartition;
-import com.datatorrent.api.Partitioner;
-import com.datatorrent.api.StatsListener;
-
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.common.util.BaseOperator;
-
-/**
- * Operator that dynamically partitions itself after 500 tuples have been emitted
- */
-public class Gen extends BaseOperator implements InputOperator, Partitioner<Gen>, StatsListener
-{
- private static final Logger LOG = LoggerFactory.getLogger(Gen.class);
-
- private static final int MAX_PARTITIONS = 4; // maximum number of partitions
-
- private int partitions = 2; // initial number of partitions
-
- @NotNull
- private int numTuples; // number of tuples to emit per window
-
- private transient int count = 0;
-
- public final transient DefaultOutputPort<Double> out = new DefaultOutputPort<Double>();
-
- @Override
- public void partitioned(Map<Integer, Partition<Gen>> map)
- {
- if (partitions != map.size()) {
- String msg = String.format("partitions = %d, map.size = %d%n", partitions, map.size());
- throw new RuntimeException(msg);
- }
- }
-
- @Override
- public void beginWindow(long windowId)
- {
- count = 0;
- }
-
- @Override
- public void emitTuples()
- {
- if (count < numTuples) {
- ++count;
- out.emit(Math.random());
- }
- }
-
- public int getNumTuples()
- {
- return numTuples;
- }
-
- /**
- * Sets the number of tuples to be emitted every window.
- * @param numTuples number of tuples
- */
- public void setNumTuples(int numTuples)
- {
- this.numTuples = numTuples;
- }
-
- @Override
- public Response processStats(BatchedOperatorStats batchedOperatorStats) {
-
- final long emittedCount = batchedOperatorStats.getTuplesEmittedPSMA();
-
- // we only perform a single dynamic repartition
- Response res = new Response();
- res.repartitionRequired = false;
- if (emittedCount > 500 && partitions < MAX_PARTITIONS) {
- LOG.info("processStats: trying repartition of input operator current {} required {}",
- partitions, MAX_PARTITIONS);
- LOG.info("**** operator id = {}, window id = {}, tuplesProcessedPSMA = {}, tuplesEmittedPSMA = {}",
- batchedOperatorStats.getOperatorId(),
- batchedOperatorStats.getCurrentWindowId(),
- batchedOperatorStats.getTuplesProcessedPSMA(),
- emittedCount);
- partitions = MAX_PARTITIONS;
- res.repartitionRequired = true;
- }
-
- return res;
- } // processStats
-
- /**
- * Clone object by serializing and deserializing using Kryo.
- * Note this is different from using {@link Kryo#copy(Object)}, which will attempt to also clone transient fields.
- *
- * @param kryo kryo object used to clone objects
- * @param src src object that copy from
- * @return cloned object
- */
- @SuppressWarnings("unchecked")
- private static <SRC> SRC cloneObject(Kryo kryo, SRC src)
- {
- kryo.setClassLoader(src.getClass().getClassLoader());
- ByteArrayOutputStream bos = null;
- Output output;
- Input input = null;
- try {
- bos = new ByteArrayOutputStream();
- output = new Output(bos);
- kryo.writeObject(output, src);
- output.close();
- input = new Input(bos.toByteArray());
- return (SRC)kryo.readObject(input, src.getClass());
- } finally {
- IOUtils.closeQuietly(input);
- IOUtils.closeQuietly(bos);
- }
- }
-
- @Override
- public Collection<Partition<Gen>> definePartitions(
- Collection<Partition<Gen>> list, PartitioningContext context)
- {
- if (partitions < 0) { // error
- String msg = String.format("Error: Bad value: partitions = %d%n", partitions);
- LOG.error(msg);
- throw new RuntimeException(msg);
- }
-
- final int prevCount = list.size();
- if (1 == prevCount) { // initial call
- LOG.info("definePartitions: First call, prevCount = {}, partitions = {}",
- prevCount, partitions);
- }
-
- if (prevCount == partitions) {
- LOG.info("definePartitions: Nothing to do in definePartitions");
- return list; // nothing to do
- }
-
- LOG.debug("definePartitions: Repartitioning from {} to {}", prevCount, partitions);
-
- Kryo kryo = new Kryo();
-
- // return value: new list of partitions (includes old list)
- List<Partition<Gen>> newPartitions = Lists.newArrayListWithExpectedSize(partitions);
-
- for (int i = 0; i < partitions; i++) {
- Gen oper = cloneObject(kryo, this);
- newPartitions.add(new DefaultPartition<>(oper));
- }
-
- LOG.info("definePartition: returning {} partitions", newPartitions.size());
- return newPartitions;
- }
-
-}