You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2019/03/11 08:36:57 UTC
[incubator-nemo] branch master updated: [NEMO-353] Launch NEXMark
applications (#198)
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 70978e7 [NEMO-353] Launch NEXMark applications (#198)
70978e7 is described below
commit 70978e7ad676263ca8c3b9cc7358cd80aa99d51b
Author: Taegeon Um <ta...@gmail.com>
AuthorDate: Mon Mar 11 17:36:53 2019 +0900
[NEMO-353] Launch NEXMark applications (#198)
JIRA: [NEMO-353: Launch NEXMark applications](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-353)
**Major changes:**
- Remove nemo-client dependency from nemo-compiler-frontend-beam and create shaded-client.jar file to include it for the execution of NEXMark jobs.
- Create an example package that contains NEXMark jar file.
- Fix NullPointerException in `BeamUnboundedSourceVertex` when executing NEXMark applications.
**Minor changes to note:**
-Update README
---
README.md | 11 +++
bin/run_beam.sh | 6 +-
bin/{run_beam.sh => run_nexmark.sh} | 6 +-
client/pom.xml | 51 ++++++++++++
.../nemo/client}/beam/BeamStateTranslator.java | 2 +-
.../nemo/client}/beam/NemoPipelineResult.java | 2 +-
.../org/apache/nemo/client}/beam/NemoRunner.java | 4 +-
.../nemo/client}/beam/NemoRunnerRegistrar.java | 3 +-
compiler/frontend/beam/pom.xml | 95 ++++++++++------------
.../frontend/beam/InMemorySideInputReader.java | 2 +-
.../compiler/frontend/beam/PipelineVisitor.java | 4 +-
.../beam/source/BeamUnboundedSourceVertex.java | 14 +++-
examples/beam/pom.xml | 6 ++
.../nemo/examples/beam/AlternatingLeastSquare.java | 7 +-
.../beam/AlternatingLeastSquareInefficient.java | 7 +-
.../org/apache/nemo/examples/beam/Broadcast.java | 5 +-
.../nemo/examples/beam/MinimalWordCount.java | 6 +-
.../beam/MultinomialLogisticRegression.java | 5 +-
.../examples/beam/NemoPipelineOptionsFactory.java | 49 +++++++++++
.../nemo/examples/beam/NetworkTraceAnalysis.java | 6 +-
.../nemo/examples/beam/PartitionWordsByLength.java | 6 +-
.../apache/nemo/examples/beam/PerKeyMedian.java | 6 +-
.../nemo/examples/beam/PerPercentileAverage.java | 6 +-
.../apache/nemo/examples/beam/SimpleSumSQL.java | 6 +-
.../nemo/examples/beam/WindowedBroadcast.java | 6 +-
.../nemo/examples/beam/WindowedWordCount.java | 6 +-
.../org/apache/nemo/examples/beam/WordCount.java | 6 +-
examples/nexmark/pom.xml | 75 +++++++++++++++++
examples/pom.xml | 1 +
29 files changed, 281 insertions(+), 128 deletions(-)
diff --git a/README.md b/README.md
index 383720b..ad27484 100644
--- a/README.md
+++ b/README.md
@@ -95,6 +95,16 @@ Below describes how Beam applications can be run directly on Nemo.
-user_main org.apache.nemo.examples.beam.WordCount \
-optimization_policy org.apache.nemo.compiler.optimizer.policy.TransientResourcePolicy \
-user_args "hdfs://v-m:9000/test_input_wordcount hdfs://v-m:9000/test_output_wordcount"
+
+## NEXMark streaming Q0 (query0) example
+./bin/run_nexmark.sh \
+ -job_id nexmark-Q0 \
+ -executor_json `pwd`/examples/resources/executors/beam_test_executor_resources.json \
+ -user_main org.apache.beam.sdk.nexmark.Main \
+ -optimization_policy org.apache.nemo.compiler.optimizer.policy.StreamingPolicy \
+ -scheduler_impl_class_name org.apache.nemo.runtime.master.scheduler.StreamingScheduler \
+ -user_args "--runner=org.apache.nemo.client.beam.NemoRunner --streaming=true --query=0 --numEventGenerators=1"
+
```
## Resource Configuration
`-executor_json` command line option can be used to provide a path to the JSON file that describes resource configuration for executors. Its default value is `config/default.json`, which initializes one of each `Transient`, `Reserved`, and `Compute` executor, each of which has one core and 1024MB memory.
@@ -148,3 +158,4 @@ Nemo Compiler and Engine can store JSON representation of intermediate DAGs.
## Speeding up builds
* To exclude Spark related packages: mvn clean install -T 2C -DskipTests -pl \\!compiler/frontend/spark,\\!examples/spark
* To exclude Beam related packages: mvn clean install -T 2C -DskipTests -pl \\!compiler/frontend/beam,\\!examples/beam
+* To exclude NEXMark related packages: mvn clean install -T 2C -DskipTests -pl \\!examples/nexmark
diff --git a/bin/run_beam.sh b/bin/run_beam.sh
index cbd082c..59b5a67 100755
--- a/bin/run_beam.sh
+++ b/bin/run_beam.sh
@@ -17,6 +17,8 @@
# specific language governing permissions and limitations
# under the License.
-java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp examples/beam/target/nemo-examples-beam-$(mvn -q \
+VERSION=$(mvn -q \
-Dexec.executable=echo -Dexec.args='${project.version}' \
- --non-recursive exec:exec)-shaded.jar:`yarn classpath` org.apache.nemo.client.JobLauncher "$@"
+ --non-recursive exec:exec)
+
+java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp client/target/nemo-client-$VERSION-shaded.jar:examples/beam/target/nemo-examples-beam-$VERSION-shaded.jar:`yarn classpath` org.apache.nemo.client.JobLauncher "$@"
diff --git a/bin/run_beam.sh b/bin/run_nexmark.sh
similarity index 76%
copy from bin/run_beam.sh
copy to bin/run_nexmark.sh
index cbd082c..d1b6a2c 100755
--- a/bin/run_beam.sh
+++ b/bin/run_nexmark.sh
@@ -17,6 +17,8 @@
# specific language governing permissions and limitations
# under the License.
-java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp examples/beam/target/nemo-examples-beam-$(mvn -q \
+VERSION=$(mvn -q \
-Dexec.executable=echo -Dexec.args='${project.version}' \
- --non-recursive exec:exec)-shaded.jar:`yarn classpath` org.apache.nemo.client.JobLauncher "$@"
+ --non-recursive exec:exec)
+
+java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp client/target/nemo-client-$VERSION-shaded.jar:`yarn classpath`:examples/nexmark/target/nexmark-$VERSION-shaded.jar org.apache.nemo.client.JobLauncher "$@"
diff --git a/client/pom.xml b/client/pom.xml
index bcfe8a2..c9439d2 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -56,5 +56,56 @@ under the License.
<artifactId>reef-runtime-yarn</artifactId>
<version>${reef.version}</version>
</dependency>
+
+
+ <!-- for nemo-beam-runner -->
+ <dependency>
+ <groupId>org.apache.nemo</groupId>
+ <artifactId>nemo-compiler-frontend-beam</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.auto.service</groupId>
+ <artifactId>auto-service</artifactId>
+ <version>${auto-service.version}</version>
+ <optional>true</optional>
+ </dependency>
</dependencies>
+
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <version>2.8.2</version>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <outputFile>
+ ${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar
+ </outputFile>
+ <transformers>
+ <!-- Required for using beam-hadoop: See https://stackoverflow.com/questions/44365545
+ -->
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer" />
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamStateTranslator.java b/client/src/main/java/org/apache/nemo/client/beam/BeamStateTranslator.java
similarity index 97%
rename from compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamStateTranslator.java
rename to client/src/main/java/org/apache/nemo/client/beam/BeamStateTranslator.java
index 98ae203..c4c69e4 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamStateTranslator.java
+++ b/client/src/main/java/org/apache/nemo/client/beam/BeamStateTranslator.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.nemo.compiler.frontend.beam;
+package org.apache.nemo.client.beam;
import org.apache.nemo.client.StateTranslator;
import org.apache.nemo.runtime.common.state.PlanState;
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java b/client/src/main/java/org/apache/nemo/client/beam/NemoPipelineResult.java
similarity index 97%
rename from compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java
rename to client/src/main/java/org/apache/nemo/client/beam/NemoPipelineResult.java
index 1cfbf7a..512f8f1 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java
+++ b/client/src/main/java/org/apache/nemo/client/beam/NemoPipelineResult.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.nemo.compiler.frontend.beam;
+package org.apache.nemo.client.beam;
import org.apache.nemo.client.ClientEndpoint;
import org.apache.beam.sdk.PipelineResult;
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunner.java b/client/src/main/java/org/apache/nemo/client/beam/NemoRunner.java
similarity index 94%
rename from compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunner.java
rename to client/src/main/java/org/apache/nemo/client/beam/NemoRunner.java
index 9128213..ccafc3e 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunner.java
+++ b/client/src/main/java/org/apache/nemo/client/beam/NemoRunner.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.nemo.compiler.frontend.beam;
+package org.apache.nemo.client.beam;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.nemo.client.JobLauncher;
@@ -24,6 +24,8 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.apache.nemo.compiler.frontend.beam.PipelineVisitor;
/**
* Runner class for BEAM programs.
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunnerRegistrar.java b/client/src/main/java/org/apache/nemo/client/beam/NemoRunnerRegistrar.java
similarity index 95%
rename from compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunnerRegistrar.java
rename to client/src/main/java/org/apache/nemo/client/beam/NemoRunnerRegistrar.java
index aa05519..832bcd6 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunnerRegistrar.java
+++ b/client/src/main/java/org/apache/nemo/client/beam/NemoRunnerRegistrar.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.nemo.compiler.frontend.beam;
+package org.apache.nemo.client.beam;
import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableList;
@@ -24,6 +24,7 @@ import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
/**
* Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the {@link NemoRunner}.
diff --git a/compiler/frontend/beam/pom.xml b/compiler/frontend/beam/pom.xml
index 47c6c31..c999fff 100644
--- a/compiler/frontend/beam/pom.xml
+++ b/compiler/frontend/beam/pom.xml
@@ -7,9 +7,7 @@ regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
-
http://www.apache.org/licenses/LICENSE-2.0
-
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,56 +16,51 @@ specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.nemo</groupId>
+ <artifactId>nemo-compiler</artifactId>
+ <version>0.2-SNAPSHOT</version>
+ <relativePath>../../</relativePath>
+ </parent>
+
+ <artifactId>nemo-compiler-frontend-beam</artifactId>
+ <name>Nemo Compiler Frontend: Beam</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nemo</groupId>
+ <artifactId>nemo-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ <version>${beam.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-core-java</artifactId>
+ <version>${beam.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-hadoop-input-format</artifactId>
+ <version>${beam.version}</version>
+ </dependency>
- <parent>
- <groupId>org.apache.nemo</groupId>
- <artifactId>nemo-compiler</artifactId>
- <version>0.2-SNAPSHOT</version>
- <relativePath>../../</relativePath>
- </parent>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk -->
- <artifactId>nemo-compiler-frontend-beam</artifactId>
- <name>Nemo Compiler Frontend: Beam</name>
- <packaging>jar</packaging>
- <dependencies>
- <dependency>
- <groupId>org.apache.nemo</groupId>
- <artifactId>nemo-common</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.nemo</groupId>
- <artifactId>nemo-client</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-java-core</artifactId>
- <version>${beam.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-runners-core-java</artifactId>
- <version>${beam.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-java-io-hadoop-input-format</artifactId>
- <version>${beam.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- <version>${hadoop.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.google.auto.service</groupId>
- <artifactId>auto-service</artifactId>
- <version>${auto-service.version}</version>
- <optional>true</optional>
- </dependency>
- </dependencies>
+ </dependencies>
</project>
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java
index 5383a2c..f1bb460 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java
@@ -92,7 +92,7 @@ public final class InMemorySideInputReader implements ReadyCheckingSideInputRead
/**
* Say a DoFn of this reader has 3 main inputs and 4 side inputs.
- * {@link org.apache.nemo.runtime.executor.datatransfer.InputWatermarkManager} guarantees that the watermark here
+ * Nemo runtime guarantees that the watermark here
* is the minimum of the all 7 input streams.
* @param newWatermark to set.
*/
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineVisitor.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineVisitor.java
index 6c02088..fa3d00d 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineVisitor.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineVisitor.java
@@ -35,7 +35,7 @@ public final class PipelineVisitor extends Pipeline.PipelineVisitor.Defaults {
* @param pipeline to visit.
* @param pipelineOptions pipeline options.
*/
- PipelineVisitor(final Pipeline pipeline, final NemoPipelineOptions pipelineOptions) {
+ public PipelineVisitor(final Pipeline pipeline, final NemoPipelineOptions pipelineOptions) {
this.context = new PipelineTranslationContext(pipeline, pipelineOptions);
}
@@ -61,7 +61,7 @@ public final class PipelineVisitor extends Pipeline.PipelineVisitor.Defaults {
/**
* @return the converted pipeline.
*/
- IRDAG getConvertedPipeline() {
+ public IRDAG getConvertedPipeline() {
return new IRDAG(context.getBuilder().build());
}
}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
index d082c2a..c2080cf 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.nemo.common.ir.Readable;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.SourceVertex;
+import org.apache.nemo.common.test.EmptyComponents;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,10 +82,17 @@ public final class BeamUnboundedSourceVertex<O, M extends UnboundedSource.Checkp
@Override
public List<Readable<Object>> getReadables(final int desiredNumOfSplits) throws Exception {
+
final List<Readable<Object>> readables = new ArrayList<>();
- source.split(desiredNumOfSplits, null)
- .forEach(unboundedSource -> readables.add(new UnboundedSourceReadable<>(unboundedSource)));
- return readables;
+ if (source != null) {
+ source.split(desiredNumOfSplits, null)
+ .forEach(unboundedSource -> readables.add(new UnboundedSourceReadable<>(unboundedSource)));
+ return readables;
+ } else {
+ // TODO #333: Remove SourceVertex#clearInternalStates
+ final SourceVertex emptySourceVertex = new EmptyComponents.EmptySourceVertex("EMPTY");
+ return emptySourceVertex.getReadables(desiredNumOfSplits);
+ }
}
@Override
diff --git a/examples/beam/pom.xml b/examples/beam/pom.xml
index 3388d99..2748249 100644
--- a/examples/beam/pom.xml
+++ b/examples/beam/pom.xml
@@ -37,6 +37,12 @@ under the License.
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.nemo</groupId>
+ <artifactId>nemo-client</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>com.github.fommil.netlib</groupId>
<artifactId>all</artifactId>
<version>${netlib.version}</version>
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
index 7407bfd..84dcc93 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
@@ -20,12 +20,10 @@ package org.apache.nemo.examples.beam;
import com.github.fommil.netlib.BLAS;
import com.github.fommil.netlib.LAPACK;
-import org.apache.nemo.compiler.frontend.beam.NemoRunner;
import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CoderProviders;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -353,7 +351,7 @@ public final class AlternatingLeastSquare {
* Main function for the ALS BEAM program.
* @param args arguments.
*/
- public static void main(final String[] args) {
+ public static void main(final String[] args) throws ClassNotFoundException {
final Long start = System.currentTimeMillis();
LOG.info(Arrays.toString(args));
final String inputFilePath = args[0];
@@ -374,8 +372,7 @@ public final class AlternatingLeastSquare {
outputFilePath = "";
}
- final PipelineOptions options = PipelineOptionsFactory.create();
- options.setRunner(NemoRunner.class);
+ final PipelineOptions options = NemoPipelineOptionsFactory.create();
options.setJobName("ALS");
options.setStableUniqueNames(PipelineOptions.CheckEnabled.OFF);
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
index 5af8dd1..06588e6 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
@@ -19,11 +19,9 @@
package org.apache.nemo.examples.beam;
import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
-import org.apache.nemo.compiler.frontend.beam.NemoRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CoderProviders;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
@@ -102,7 +100,7 @@ public final class AlternatingLeastSquareInefficient {
* Main function for the ALS BEAM program.
* @param args arguments.
*/
- public static void main(final String[] args) {
+ public static void main(final String[] args) throws ClassNotFoundException {
final Long start = System.currentTimeMillis();
LOG.info(Arrays.toString(args));
final String inputFilePath = args[0];
@@ -115,8 +113,7 @@ public final class AlternatingLeastSquareInefficient {
lambda = 0.05;
}
- final PipelineOptions options = PipelineOptionsFactory.create();
- options.setRunner(NemoRunner.class);
+ final PipelineOptions options = NemoPipelineOptionsFactory.create();
options.setJobName("ALS");
options.setStableUniqueNames(PipelineOptions.CheckEnabled.OFF);
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/Broadcast.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/Broadcast.java
index 5ded58b..522eee2 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/Broadcast.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/Broadcast.java
@@ -18,10 +18,8 @@
*/
package org.apache.nemo.examples.beam;
-import org.apache.nemo.compiler.frontend.beam.NemoRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
@@ -48,8 +46,7 @@ public final class Broadcast {
public static void main(final String[] args) {
final String inputFilePath = args[0];
final String outputFilePath = args[1];
- final PipelineOptions options = PipelineOptionsFactory.create();
- options.setRunner(NemoRunner.class);
+ final PipelineOptions options = NemoPipelineOptionsFactory.create();
final Pipeline p = Pipeline.create(options);
final PCollection<String> elemCollection = GenericSourceSink.read(p, inputFilePath);
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MinimalWordCount.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MinimalWordCount.java
index 5549d10..2b595f6 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MinimalWordCount.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MinimalWordCount.java
@@ -20,15 +20,12 @@ package org.apache.nemo.examples.beam;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptors;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoRunner;
import java.util.Arrays;
/**
@@ -47,8 +44,7 @@ public final class MinimalWordCount {
public static void main(final String[] args) {
final String inputFilePath = args[0];
final String outputFilePath = args[1];
- final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
- options.setRunner(NemoRunner.class);
+ final PipelineOptions options = NemoPipelineOptionsFactory.create();
options.setJobName("MinimalWordCount");
// Create the Pipeline object with the options we defined above
final Pipeline p = Pipeline.create(options);
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
index 2b37eba..7a671fa 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
@@ -19,11 +19,9 @@
package org.apache.nemo.examples.beam;
import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
-import org.apache.nemo.compiler.frontend.beam.NemoRunner;
import org.apache.nemo.common.Pair;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
@@ -419,8 +417,7 @@ public final class MultinomialLogisticRegression {
initialModelKeys.add(i);
}
- final PipelineOptions options = PipelineOptionsFactory.create();
- options.setRunner(NemoRunner.class);
+ final PipelineOptions options = NemoPipelineOptionsFactory.create();
options.setJobName("MLR");
options.setStableUniqueNames(PipelineOptions.CheckEnabled.OFF);
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/NemoPipelineOptionsFactory.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/NemoPipelineOptionsFactory.java
new file mode 100644
index 0000000..000d238
--- /dev/null
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/NemoPipelineOptionsFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.examples.beam;
+
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+
+/**
+ * NemoPipelineOptionsFactory that creates nemo pipeline options.
+ */
+final class NemoPipelineOptionsFactory {
+
+ private NemoPipelineOptionsFactory() {
+ }
+
+ /**
+ * Create a PipelineOptions for nemo runner.
+ * @return pipeline options
+ */
+ public static PipelineOptions create() {
+ final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
+ try {
+ options.setRunner((Class<? extends PipelineRunner<?>>)
+ Class.forName("org.apache.nemo.client.beam.NemoRunner"));
+ return options;
+ } catch (final ClassNotFoundException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/NetworkTraceAnalysis.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/NetworkTraceAnalysis.java
index af6cff6..e2a829d 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/NetworkTraceAnalysis.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/NetworkTraceAnalysis.java
@@ -18,11 +18,8 @@
*/
package org.apache.nemo.examples.beam;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
@@ -58,8 +55,7 @@ public final class NetworkTraceAnalysis {
final String input0FilePath = args[0];
final String input1FilePath = args[1];
final String outputFilePath = args[2];
- final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
- options.setRunner(NemoRunner.class);
+ final PipelineOptions options = NemoPipelineOptionsFactory.create();
options.setJobName("NetworkTraceAnalysis");
// Given "4 0.0 192.168.3.1 -> 192.168.0.2 Len=29", this finds "192.168.3.1", "192.168.0.2" and "29"
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PartitionWordsByLength.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PartitionWordsByLength.java
index 816be62..c4bf5d8 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PartitionWordsByLength.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PartitionWordsByLength.java
@@ -18,11 +18,8 @@
*/
package org.apache.nemo.examples.beam;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.*;
@@ -46,8 +43,7 @@ public final class PartitionWordsByLength {
public static void main(final String[] args) {
final String inputFilePath = args[0];
final String outputFilePath = args[1];
- final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
- options.setRunner(NemoRunner.class);
+ final PipelineOptions options = NemoPipelineOptionsFactory.create();
options.setJobName("PartitionWordsByLength");
// {} here is required for preserving type information.
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerKeyMedian.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerKeyMedian.java
index 6255a93..268f4bc 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerKeyMedian.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerKeyMedian.java
@@ -18,11 +18,8 @@
*/
package org.apache.nemo.examples.beam;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -52,8 +49,7 @@ public final class PerKeyMedian {
public static void main(final String[] args) {
final String inputFilePath = args[0];
final String outputFilePath = args[1];
- final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
- options.setRunner(NemoRunner.class);
+ final PipelineOptions options = NemoPipelineOptionsFactory.create();
options.setJobName("PerKeyMedian");
final Pipeline p = Pipeline.create(options);
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerPercentileAverage.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerPercentileAverage.java
index 1f3b700..e46946f 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerPercentileAverage.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerPercentileAverage.java
@@ -19,12 +19,9 @@
package org.apache.nemo.examples.beam;
import com.google.common.collect.Lists;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -51,8 +48,7 @@ public final class PerPercentileAverage {
public static void main(final String[] args) {
final String inputFilePath = args[0];
final String outputFilePath = args[1];
- final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
- options.setRunner(NemoRunner.class);
+ final PipelineOptions options = NemoPipelineOptionsFactory.create();
options.setJobName("PerPercentileAverage");
final Pipeline p = Pipeline.create(options);
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
index 6fa8f5e..84e20fd 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
@@ -21,12 +21,9 @@ package org.apache.nemo.examples.beam;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.*;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoRunner;
import java.util.List;
import java.util.stream.Collectors;
@@ -49,8 +46,7 @@ public final class SimpleSumSQL {
public static void main(final String[] args) {
final String outputFilePath = args[0];
- final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
- options.setRunner(NemoRunner.class);
+ final PipelineOptions options = NemoPipelineOptionsFactory.create();
options.setJobName("SimpleSumSQL");
final Pipeline p = Pipeline.create(options);
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
index d26e102..d01a41e 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
@@ -21,14 +21,11 @@ package org.apache.nemo.examples.beam;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoRunner;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -65,8 +62,7 @@ public final class WindowedBroadcast {
.<Long>into(SlidingWindows.of(Duration.standardSeconds(2))
.every(Duration.standardSeconds(1)));
- final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
- options.setRunner(NemoRunner.class);
+ final PipelineOptions options = NemoPipelineOptionsFactory.create();
options.setJobName("WindowedBroadcast");
final Pipeline p = Pipeline.create(options);
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
index 007129c..eaa3f05 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
@@ -21,15 +21,12 @@ package org.apache.nemo.examples.beam;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoRunner;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -110,8 +107,7 @@ public final class WindowedWordCount {
.every(Duration.standardSeconds(5)));
}
- final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
- options.setRunner(NemoRunner.class);
+ final PipelineOptions options = NemoPipelineOptionsFactory.create();
options.setJobName("WindowedWordCount");
final Pipeline p = Pipeline.create(options);
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCount.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCount.java
index ba3cb80..6ffaa2e 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCount.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCount.java
@@ -18,11 +18,8 @@
*/
package org.apache.nemo.examples.beam;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -44,8 +41,7 @@ public final class WordCount {
public static void main(final String[] args) {
final String inputFilePath = args[0];
final String outputFilePath = args[1];
- final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
- options.setRunner(NemoRunner.class);
+ final PipelineOptions options = NemoPipelineOptionsFactory.create();
options.setJobName("WordCount");
final Pipeline p = Pipeline.create(options);
diff --git a/examples/nexmark/pom.xml b/examples/nexmark/pom.xml
new file mode 100644
index 0000000..23a4ee1
--- /dev/null
+++ b/examples/nexmark/pom.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>nemo-examples</artifactId>
+ <groupId>org.apache.nemo</groupId>
+ <version>0.2-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>nexmark</artifactId>
+ <name>Nemo Examples: NEXMark</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-nexmark</artifactId>
+ <version>${beam.version}</version>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-google-cloud-platform -->
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
+ <version>${beam.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <outputFile>
+ ${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar
+ </outputFile>
+ <transformers>
+ <!-- Required for using beam-hadoop: See https://stackoverflow.com/questions/44365545
+ -->
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/examples/pom.xml b/examples/pom.xml
index ed66ccb..8be9bd1 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -32,6 +32,7 @@ under the License.
<modules>
<module>beam</module>
<module>spark</module>
+ <module>nexmark</module>
</modules>