You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2019/03/11 08:36:57 UTC

[incubator-nemo] branch master updated: [NEMO-353] Launch NEXMark applications (#198)

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 70978e7  [NEMO-353] Launch NEXMark applications (#198)
70978e7 is described below

commit 70978e7ad676263ca8c3b9cc7358cd80aa99d51b
Author: Taegeon Um <ta...@gmail.com>
AuthorDate: Mon Mar 11 17:36:53 2019 +0900

    [NEMO-353] Launch NEXMark applications (#198)
    
    JIRA: [NEMO-353: Launch NEXMark applications](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-353)
    
    **Major changes:**
    - Remove nemo-client dependency from nemo-compiler-frontend-beam and create shaded-client.jar file to include it for the execution of NEXMark jobs.
    - Create an example package that contains NEXMark jar file.
    - Fix NullPointerException in `BeamUnboundedSourceVertex` when executing NEXMark applications.
    
    **Minor changes to note:**
    -Update README
---
 README.md                                          | 11 +++
 bin/run_beam.sh                                    |  6 +-
 bin/{run_beam.sh => run_nexmark.sh}                |  6 +-
 client/pom.xml                                     | 51 ++++++++++++
 .../nemo/client}/beam/BeamStateTranslator.java     |  2 +-
 .../nemo/client}/beam/NemoPipelineResult.java      |  2 +-
 .../org/apache/nemo/client}/beam/NemoRunner.java   |  4 +-
 .../nemo/client}/beam/NemoRunnerRegistrar.java     |  3 +-
 compiler/frontend/beam/pom.xml                     | 95 ++++++++++------------
 .../frontend/beam/InMemorySideInputReader.java     |  2 +-
 .../compiler/frontend/beam/PipelineVisitor.java    |  4 +-
 .../beam/source/BeamUnboundedSourceVertex.java     | 14 +++-
 examples/beam/pom.xml                              |  6 ++
 .../nemo/examples/beam/AlternatingLeastSquare.java |  7 +-
 .../beam/AlternatingLeastSquareInefficient.java    |  7 +-
 .../org/apache/nemo/examples/beam/Broadcast.java   |  5 +-
 .../nemo/examples/beam/MinimalWordCount.java       |  6 +-
 .../beam/MultinomialLogisticRegression.java        |  5 +-
 .../examples/beam/NemoPipelineOptionsFactory.java  | 49 +++++++++++
 .../nemo/examples/beam/NetworkTraceAnalysis.java   |  6 +-
 .../nemo/examples/beam/PartitionWordsByLength.java |  6 +-
 .../apache/nemo/examples/beam/PerKeyMedian.java    |  6 +-
 .../nemo/examples/beam/PerPercentileAverage.java   |  6 +-
 .../apache/nemo/examples/beam/SimpleSumSQL.java    |  6 +-
 .../nemo/examples/beam/WindowedBroadcast.java      |  6 +-
 .../nemo/examples/beam/WindowedWordCount.java      |  6 +-
 .../org/apache/nemo/examples/beam/WordCount.java   |  6 +-
 examples/nexmark/pom.xml                           | 75 +++++++++++++++++
 examples/pom.xml                                   |  1 +
 29 files changed, 281 insertions(+), 128 deletions(-)

diff --git a/README.md b/README.md
index 383720b..ad27484 100644
--- a/README.md
+++ b/README.md
@@ -95,6 +95,16 @@ Below describes how Beam applications can be run directly on Nemo.
  	-user_main org.apache.nemo.examples.beam.WordCount \
  	-optimization_policy org.apache.nemo.compiler.optimizer.policy.TransientResourcePolicy \
 	-user_args "hdfs://v-m:9000/test_input_wordcount hdfs://v-m:9000/test_output_wordcount"
+
+## NEXMark streaming Q0 (query0) example 
+./bin/run_nexmark.sh \
+ 	-job_id nexmark-Q0 \
+	-executor_json `pwd`/examples/resources/executors/beam_test_executor_resources.json \
+ 	-user_main org.apache.beam.sdk.nexmark.Main \
+ 	-optimization_policy org.apache.nemo.compiler.optimizer.policy.StreamingPolicy \
+  -scheduler_impl_class_name org.apache.nemo.runtime.master.scheduler.StreamingScheduler \	
+	-user_args "--runner=org.apache.nemo.client.beam.NemoRunner --streaming=true --query=0 --numEventGenerators=1"
+
 ```
 ## Resource Configuration
 `-executor_json` command line option can be used to provide a path to the JSON file that describes resource configuration for executors. Its default value is `config/default.json`, which initializes one of each `Transient`, `Reserved`, and `Compute` executor, each of which has one core and 1024MB memory.
@@ -148,3 +158,4 @@ Nemo Compiler and Engine can store JSON representation of intermediate DAGs.
 ## Speeding up builds 
 * To exclude Spark related packages: mvn clean install -T 2C -DskipTests -pl \\!compiler/frontend/spark,\\!examples/spark
 * To exclude Beam related packages: mvn clean install -T 2C -DskipTests -pl \\!compiler/frontend/beam,\\!examples/beam
+* To exclude NEXMark related packages: mvn clean install -T 2C -DskipTests -pl \\!examples/nexmark
diff --git a/bin/run_beam.sh b/bin/run_beam.sh
index cbd082c..59b5a67 100755
--- a/bin/run_beam.sh
+++ b/bin/run_beam.sh
@@ -17,6 +17,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
-java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp examples/beam/target/nemo-examples-beam-$(mvn -q \
+VERSION=$(mvn -q \
   -Dexec.executable=echo -Dexec.args='${project.version}' \
-  --non-recursive exec:exec)-shaded.jar:`yarn classpath` org.apache.nemo.client.JobLauncher "$@"
+  --non-recursive exec:exec)
+
+java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp client/target/nemo-client-$VERSION-shaded.jar:examples/beam/target/nemo-examples-beam-$VERSION-shaded.jar:`yarn classpath` org.apache.nemo.client.JobLauncher "$@"
diff --git a/bin/run_beam.sh b/bin/run_nexmark.sh
similarity index 76%
copy from bin/run_beam.sh
copy to bin/run_nexmark.sh
index cbd082c..d1b6a2c 100755
--- a/bin/run_beam.sh
+++ b/bin/run_nexmark.sh
@@ -17,6 +17,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
-java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp examples/beam/target/nemo-examples-beam-$(mvn -q \
+VERSION=$(mvn -q \
   -Dexec.executable=echo -Dexec.args='${project.version}' \
-  --non-recursive exec:exec)-shaded.jar:`yarn classpath` org.apache.nemo.client.JobLauncher "$@"
+  --non-recursive exec:exec)
+
+java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp client/target/nemo-client-$VERSION-shaded.jar:`yarn classpath`:examples/nexmark/target/nexmark-$VERSION-shaded.jar org.apache.nemo.client.JobLauncher "$@"
diff --git a/client/pom.xml b/client/pom.xml
index bcfe8a2..c9439d2 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -56,5 +56,56 @@ under the License.
             <artifactId>reef-runtime-yarn</artifactId>
             <version>${reef.version}</version>
         </dependency>
+
+
+      <!-- for nemo-beam-runner -->
+      <dependency>
+        <groupId>org.apache.nemo</groupId>
+        <artifactId>nemo-compiler-frontend-beam</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>com.google.auto.service</groupId>
+        <artifactId>auto-service</artifactId>
+        <version>${auto-service.version}</version>
+        <optional>true</optional>
+      </dependency>
     </dependencies>
+
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-deploy-plugin</artifactId>
+        <version>2.8.2</version>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <version>3.0.0</version>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <outputFile>
+                ${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar
+              </outputFile>
+              <transformers>
+                <!-- Required for using beam-hadoop: See https://stackoverflow.com/questions/44365545
+                -->
+                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer" />
+              </transformers>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
 </project>
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamStateTranslator.java b/client/src/main/java/org/apache/nemo/client/beam/BeamStateTranslator.java
similarity index 97%
rename from compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamStateTranslator.java
rename to client/src/main/java/org/apache/nemo/client/beam/BeamStateTranslator.java
index 98ae203..c4c69e4 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamStateTranslator.java
+++ b/client/src/main/java/org/apache/nemo/client/beam/BeamStateTranslator.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.nemo.compiler.frontend.beam;
+package org.apache.nemo.client.beam;
 
 import org.apache.nemo.client.StateTranslator;
 import org.apache.nemo.runtime.common.state.PlanState;
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java b/client/src/main/java/org/apache/nemo/client/beam/NemoPipelineResult.java
similarity index 97%
rename from compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java
rename to client/src/main/java/org/apache/nemo/client/beam/NemoPipelineResult.java
index 1cfbf7a..512f8f1 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java
+++ b/client/src/main/java/org/apache/nemo/client/beam/NemoPipelineResult.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.nemo.compiler.frontend.beam;
+package org.apache.nemo.client.beam;
 
 import org.apache.nemo.client.ClientEndpoint;
 import org.apache.beam.sdk.PipelineResult;
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunner.java b/client/src/main/java/org/apache/nemo/client/beam/NemoRunner.java
similarity index 94%
rename from compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunner.java
rename to client/src/main/java/org/apache/nemo/client/beam/NemoRunner.java
index 9128213..ccafc3e 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunner.java
+++ b/client/src/main/java/org/apache/nemo/client/beam/NemoRunner.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.nemo.compiler.frontend.beam;
+package org.apache.nemo.client.beam;
 
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.nemo.client.JobLauncher;
@@ -24,6 +24,8 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.apache.nemo.compiler.frontend.beam.PipelineVisitor;
 
 /**
  * Runner class for BEAM programs.
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunnerRegistrar.java b/client/src/main/java/org/apache/nemo/client/beam/NemoRunnerRegistrar.java
similarity index 95%
rename from compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunnerRegistrar.java
rename to client/src/main/java/org/apache/nemo/client/beam/NemoRunnerRegistrar.java
index aa05519..832bcd6 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunnerRegistrar.java
+++ b/client/src/main/java/org/apache/nemo/client/beam/NemoRunnerRegistrar.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.nemo.compiler.frontend.beam;
+package org.apache.nemo.client.beam;
 
 import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableList;
@@ -24,6 +24,7 @@ import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
 import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
 
 /**
  * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the {@link NemoRunner}.
diff --git a/compiler/frontend/beam/pom.xml b/compiler/frontend/beam/pom.xml
index 47c6c31..c999fff 100644
--- a/compiler/frontend/beam/pom.xml
+++ b/compiler/frontend/beam/pom.xml
@@ -7,9 +7,7 @@ regarding copyright ownership.  The ASF licenses this file
 to you under the Apache License, Version 2.0 (the
 "License"); you may not use this file except in compliance
 with the License.  You may obtain a copy of the License at
-
   http://www.apache.org/licenses/LICENSE-2.0
-
 Unless required by applicable law or agreed to in writing,
 software distributed under the License is distributed on an
 "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,56 +16,51 @@ specific language governing permissions and limitations
 under the License.
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.nemo</groupId>
+    <artifactId>nemo-compiler</artifactId>
+    <version>0.2-SNAPSHOT</version>
+    <relativePath>../../</relativePath>
+  </parent>
+
+  <artifactId>nemo-compiler-frontend-beam</artifactId>
+  <name>Nemo Compiler Frontend: Beam</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.nemo</groupId>
+      <artifactId>nemo-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+      <version>${beam.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-core-java</artifactId>
+      <version>${beam.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-hadoop-input-format</artifactId>
+      <version>${beam.version}</version>
+    </dependency>
 
-    <parent>
-        <groupId>org.apache.nemo</groupId>
-        <artifactId>nemo-compiler</artifactId>
-        <version>0.2-SNAPSHOT</version>
-        <relativePath>../../</relativePath>
-    </parent>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk -->
 
-    <artifactId>nemo-compiler-frontend-beam</artifactId>
-    <name>Nemo Compiler Frontend: Beam</name>
-    <packaging>jar</packaging>
 
-    <dependencies>
-	      <dependency>
-            <groupId>org.apache.nemo</groupId>
-            <artifactId>nemo-common</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nemo</groupId>
-            <artifactId>nemo-client</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.beam</groupId>
-            <artifactId>beam-sdks-java-core</artifactId>
-            <version>${beam.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.beam</groupId>
-            <artifactId>beam-runners-core-java</artifactId>
-            <version>${beam.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.beam</groupId>
-            <artifactId>beam-sdks-java-io-hadoop-input-format</artifactId>
-            <version>${beam.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-mapreduce-client-core</artifactId>
-            <version>${hadoop.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>com.google.auto.service</groupId>
-            <artifactId>auto-service</artifactId>
-            <version>${auto-service.version}</version>
-            <optional>true</optional>
-        </dependency>
-    </dependencies>
+  </dependencies>
 </project>
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java
index 5383a2c..f1bb460 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java
@@ -92,7 +92,7 @@ public final class InMemorySideInputReader implements ReadyCheckingSideInputRead
 
   /**
    * Say a DoFn of this reader has 3 main inputs and 4 side inputs.
-   * {@link org.apache.nemo.runtime.executor.datatransfer.InputWatermarkManager} guarantees that the watermark here
+   * Nemo runtime guarantees that the watermark here
    * is the minimum of the all 7 input streams.
    * @param newWatermark to set.
    */
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineVisitor.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineVisitor.java
index 6c02088..fa3d00d 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineVisitor.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineVisitor.java
@@ -35,7 +35,7 @@ public final class PipelineVisitor extends Pipeline.PipelineVisitor.Defaults {
    * @param pipeline to visit.
    * @param pipelineOptions pipeline options.
    */
-  PipelineVisitor(final Pipeline pipeline, final NemoPipelineOptions pipelineOptions) {
+  public PipelineVisitor(final Pipeline pipeline, final NemoPipelineOptions pipelineOptions) {
     this.context = new PipelineTranslationContext(pipeline, pipelineOptions);
   }
 
@@ -61,7 +61,7 @@ public final class PipelineVisitor extends Pipeline.PipelineVisitor.Defaults {
   /**
    * @return the converted pipeline.
    */
-  IRDAG getConvertedPipeline() {
+  public IRDAG getConvertedPipeline() {
     return new IRDAG(context.getBuilder().build());
   }
 }
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
index d082c2a..c2080cf 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.nemo.common.ir.Readable;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.ir.vertex.SourceVertex;
+import org.apache.nemo.common.test.EmptyComponents;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -81,10 +82,17 @@ public final class BeamUnboundedSourceVertex<O, M extends UnboundedSource.Checkp
 
   @Override
   public List<Readable<Object>> getReadables(final int desiredNumOfSplits) throws Exception {
+
     final List<Readable<Object>> readables = new ArrayList<>();
-    source.split(desiredNumOfSplits, null)
-      .forEach(unboundedSource -> readables.add(new UnboundedSourceReadable<>(unboundedSource)));
-    return readables;
+    if (source != null) {
+      source.split(desiredNumOfSplits, null)
+        .forEach(unboundedSource -> readables.add(new UnboundedSourceReadable<>(unboundedSource)));
+      return readables;
+    } else {
+      // TODO #333: Remove SourceVertex#clearInternalStates
+      final SourceVertex emptySourceVertex = new EmptyComponents.EmptySourceVertex("EMPTY");
+      return emptySourceVertex.getReadables(desiredNumOfSplits);
+    }
   }
 
   @Override
diff --git a/examples/beam/pom.xml b/examples/beam/pom.xml
index 3388d99..2748249 100644
--- a/examples/beam/pom.xml
+++ b/examples/beam/pom.xml
@@ -37,6 +37,12 @@ under the License.
             <version>${project.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.nemo</groupId>
+            <artifactId>nemo-client</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>com.github.fommil.netlib</groupId>
             <artifactId>all</artifactId>
             <version>${netlib.version}</version>
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
index 7407bfd..84dcc93 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
@@ -20,12 +20,10 @@ package org.apache.nemo.examples.beam;
 
 import com.github.fommil.netlib.BLAS;
 import com.github.fommil.netlib.LAPACK;
-import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CoderProviders;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.*;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -353,7 +351,7 @@ public final class AlternatingLeastSquare {
    * Main function for the ALS BEAM program.
    * @param args arguments.
    */
-  public static void main(final String[] args) {
+  public static void main(final String[] args) throws ClassNotFoundException {
     final Long start = System.currentTimeMillis();
     LOG.info(Arrays.toString(args));
     final String inputFilePath = args[0];
@@ -374,8 +372,7 @@ public final class AlternatingLeastSquare {
       outputFilePath = "";
     }
 
-    final PipelineOptions options = PipelineOptionsFactory.create();
-    options.setRunner(NemoRunner.class);
+    final PipelineOptions options = NemoPipelineOptionsFactory.create();
     options.setJobName("ALS");
     options.setStableUniqueNames(PipelineOptions.CheckEnabled.OFF);
 
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
index 5af8dd1..06588e6 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
@@ -19,11 +19,9 @@
 package org.apache.nemo.examples.beam;
 
 import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
-import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CoderProviders;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -102,7 +100,7 @@ public final class AlternatingLeastSquareInefficient {
    * Main function for the ALS BEAM program.
    * @param args arguments.
    */
-  public static void main(final String[] args) {
+  public static void main(final String[] args) throws ClassNotFoundException {
     final Long start = System.currentTimeMillis();
     LOG.info(Arrays.toString(args));
     final String inputFilePath = args[0];
@@ -115,8 +113,7 @@ public final class AlternatingLeastSquareInefficient {
       lambda = 0.05;
     }
 
-    final PipelineOptions options = PipelineOptionsFactory.create();
-    options.setRunner(NemoRunner.class);
+    final PipelineOptions options = NemoPipelineOptionsFactory.create();
     options.setJobName("ALS");
     options.setStableUniqueNames(PipelineOptions.CheckEnabled.OFF);
 
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/Broadcast.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/Broadcast.java
index 5ded58b..522eee2 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/Broadcast.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/Broadcast.java
@@ -18,10 +18,8 @@
  */
 package org.apache.nemo.examples.beam;
 
-import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
@@ -48,8 +46,7 @@ public final class Broadcast {
   public static void main(final String[] args) {
     final String inputFilePath = args[0];
     final String outputFilePath = args[1];
-    final PipelineOptions options = PipelineOptionsFactory.create();
-    options.setRunner(NemoRunner.class);
+    final PipelineOptions options = NemoPipelineOptionsFactory.create();
 
     final Pipeline p = Pipeline.create(options);
     final PCollection<String> elemCollection = GenericSourceSink.read(p, inputFilePath);
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MinimalWordCount.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MinimalWordCount.java
index 5549d10..2b595f6 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MinimalWordCount.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MinimalWordCount.java
@@ -20,15 +20,12 @@ package org.apache.nemo.examples.beam;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Filter;
 import org.apache.beam.sdk.transforms.FlatMapElements;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TypeDescriptors;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 
 import java.util.Arrays;
 /**
@@ -47,8 +44,7 @@ public final class MinimalWordCount {
   public static void main(final String[] args) {
     final String inputFilePath = args[0];
     final String outputFilePath = args[1];
-    final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
-    options.setRunner(NemoRunner.class);
+    final PipelineOptions options = NemoPipelineOptionsFactory.create();
     options.setJobName("MinimalWordCount");
     // Create the Pipeline object with the options we defined above
     final Pipeline p = Pipeline.create(options);
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
index 2b37eba..7a671fa 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
@@ -19,11 +19,9 @@
 package org.apache.nemo.examples.beam;
 
 import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
-import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.apache.nemo.common.Pair;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.*;
 import org.apache.beam.sdk.transforms.join.CoGbkResult;
 import org.apache.beam.sdk.transforms.join.CoGroupByKey;
@@ -419,8 +417,7 @@ public final class MultinomialLogisticRegression {
       initialModelKeys.add(i);
     }
 
-    final PipelineOptions options = PipelineOptionsFactory.create();
-    options.setRunner(NemoRunner.class);
+    final PipelineOptions options = NemoPipelineOptionsFactory.create();
     options.setJobName("MLR");
     options.setStableUniqueNames(PipelineOptions.CheckEnabled.OFF);
 
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/NemoPipelineOptionsFactory.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/NemoPipelineOptionsFactory.java
new file mode 100644
index 0000000..000d238
--- /dev/null
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/NemoPipelineOptionsFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.examples.beam;
+
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+
+/**
+ * NemoPipelineOptionsFactory that creates nemo pipeline options.
+ */
+final class NemoPipelineOptionsFactory {
+
+  private NemoPipelineOptionsFactory() {
+  }
+
+  /**
+   * Create a PipelineOptions for nemo runner.
+   * @return pipeline options
+   */
+  public static PipelineOptions create() {
+    final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
+    try {
+      options.setRunner((Class<? extends PipelineRunner<?>>)
+        Class.forName("org.apache.nemo.client.beam.NemoRunner"));
+      return options;
+    } catch (final ClassNotFoundException e) {
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/NetworkTraceAnalysis.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/NetworkTraceAnalysis.java
index af6cff6..e2a829d 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/NetworkTraceAnalysis.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/NetworkTraceAnalysis.java
@@ -18,11 +18,8 @@
  */
 package org.apache.nemo.examples.beam;
 
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.*;
 import org.apache.beam.sdk.transforms.join.CoGbkResult;
 import org.apache.beam.sdk.transforms.join.CoGroupByKey;
@@ -58,8 +55,7 @@ public final class NetworkTraceAnalysis {
     final String input0FilePath = args[0];
     final String input1FilePath = args[1];
     final String outputFilePath = args[2];
-    final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
-    options.setRunner(NemoRunner.class);
+    final PipelineOptions options = NemoPipelineOptionsFactory.create();
     options.setJobName("NetworkTraceAnalysis");
 
     // Given "4 0.0 192.168.3.1 -> 192.168.0.2 Len=29", this finds "192.168.3.1", "192.168.0.2" and "29"
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PartitionWordsByLength.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PartitionWordsByLength.java
index 816be62..c4bf5d8 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PartitionWordsByLength.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PartitionWordsByLength.java
@@ -18,11 +18,8 @@
  */
 package org.apache.nemo.examples.beam;
 
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.*;
 import org.apache.beam.sdk.values.*;
 
@@ -46,8 +43,7 @@ public final class PartitionWordsByLength {
   public static void main(final String[] args) {
     final String inputFilePath = args[0];
     final String outputFilePath = args[1];
-    final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
-    options.setRunner(NemoRunner.class);
+    final PipelineOptions options = NemoPipelineOptionsFactory.create();
     options.setJobName("PartitionWordsByLength");
 
     // {} here is required for preserving type information.
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerKeyMedian.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerKeyMedian.java
index 6255a93..268f4bc 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerKeyMedian.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerKeyMedian.java
@@ -18,11 +18,8 @@
  */
 package org.apache.nemo.examples.beam;
 
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.*;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -52,8 +49,7 @@ public final class PerKeyMedian {
   public static void main(final String[] args) {
     final String inputFilePath = args[0];
     final String outputFilePath = args[1];
-    final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
-    options.setRunner(NemoRunner.class);
+    final PipelineOptions options = NemoPipelineOptionsFactory.create();
     options.setJobName("PerKeyMedian");
 
     final Pipeline p = Pipeline.create(options);
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerPercentileAverage.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerPercentileAverage.java
index 1f3b700..e46946f 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerPercentileAverage.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerPercentileAverage.java
@@ -19,12 +19,9 @@
 package org.apache.nemo.examples.beam;
 
 import com.google.common.collect.Lists;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.*;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -51,8 +48,7 @@ public final class PerPercentileAverage {
   public static void main(final String[] args) {
     final String inputFilePath = args[0];
     final String outputFilePath = args[1];
-    final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
-    options.setRunner(NemoRunner.class);
+    final PipelineOptions options = NemoPipelineOptionsFactory.create();
     options.setJobName("PerPercentileAverage");
 
     final Pipeline p = Pipeline.create(options);
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
index 6fa8f5e..84e20fd 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
@@ -21,12 +21,9 @@ package org.apache.nemo.examples.beam;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.extensions.sql.SqlTransform;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.*;
 import org.apache.beam.sdk.values.*;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 
 import java.util.List;
 import java.util.stream.Collectors;
@@ -49,8 +46,7 @@ public final class SimpleSumSQL {
   public static void main(final String[] args) {
     final String outputFilePath = args[0];
 
-    final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
-    options.setRunner(NemoRunner.class);
+    final PipelineOptions options = NemoPipelineOptionsFactory.create();
     options.setJobName("SimpleSumSQL");
     final Pipeline p = Pipeline.create(options);
 
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
index d26e102..d01a41e 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
@@ -21,14 +21,11 @@ package org.apache.nemo.examples.beam;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.*;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
@@ -65,8 +62,7 @@ public final class WindowedBroadcast {
       .<Long>into(SlidingWindows.of(Duration.standardSeconds(2))
       .every(Duration.standardSeconds(1)));
 
-    final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
-    options.setRunner(NemoRunner.class);
+    final PipelineOptions options = NemoPipelineOptionsFactory.create();
     options.setJobName("WindowedBroadcast");
 
     final Pipeline p = Pipeline.create(options);
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
index 007129c..eaa3f05 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
@@ -21,15 +21,12 @@ package org.apache.nemo.examples.beam;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.*;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
@@ -110,8 +107,7 @@ public final class WindowedWordCount {
         .every(Duration.standardSeconds(5)));
     }
 
-    final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
-    options.setRunner(NemoRunner.class);
+    final PipelineOptions options = NemoPipelineOptionsFactory.create();
     options.setJobName("WindowedWordCount");
 
     final Pipeline p = Pipeline.create(options);
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCount.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCount.java
index ba3cb80..6ffaa2e 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCount.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCount.java
@@ -18,11 +18,8 @@
  */
 package org.apache.nemo.examples.beam;
 
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.*;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -44,8 +41,7 @@ public final class WordCount {
   public static void main(final String[] args) {
     final String inputFilePath = args[0];
     final String outputFilePath = args[1];
-    final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
-    options.setRunner(NemoRunner.class);
+    final PipelineOptions options = NemoPipelineOptionsFactory.create();
     options.setJobName("WordCount");
 
     final Pipeline p = Pipeline.create(options);
diff --git a/examples/nexmark/pom.xml b/examples/nexmark/pom.xml
new file mode 100644
index 0000000..23a4ee1
--- /dev/null
+++ b/examples/nexmark/pom.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>nemo-examples</artifactId>
+    <groupId>org.apache.nemo</groupId>
+    <version>0.2-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>nexmark</artifactId>
+  <name>Nemo Examples: NEXMark</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-nexmark</artifactId>
+      <version>${beam.version}</version>
+    </dependency>
+
+    <!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-google-cloud-platform -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
+      <version>${beam.version}</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <version>3.0.0</version>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <outputFile>
+                ${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar
+              </outputFile>
+              <transformers>
+                <!-- Required for using beam-hadoop: See https://stackoverflow.com/questions/44365545
+                -->
+                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
+              </transformers>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/examples/pom.xml b/examples/pom.xml
index ed66ccb..8be9bd1 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -32,6 +32,7 @@ under the License.
   <modules>
     <module>beam</module>
     <module>spark</module>
+    <module>nexmark</module>
   </modules>