You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/12/03 06:07:27 UTC

[GitHub] taegeonum closed pull request #163: [NEMO-294] Beam-Runner

taegeonum closed pull request #163: [NEMO-294] Beam-Runner
URL: https://github.com/apache/incubator-nemo/pull/163
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bin/run_beam.sh b/bin/run_beam.sh
index 41c1ef598..cbd082c7a 100755
--- a/bin/run_beam.sh
+++ b/bin/run_beam.sh
@@ -17,4 +17,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp examples/beam/target/nemo-examples-beam-0.1-SNAPSHOT-shaded.jar:`yarn classpath` org.apache.nemo.client.JobLauncher "$@"
+java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp examples/beam/target/nemo-examples-beam-$(mvn -q \
+  -Dexec.executable=echo -Dexec.args='${project.version}' \
+  --non-recursive exec:exec)-shaded.jar:`yarn classpath` org.apache.nemo.client.JobLauncher "$@"
diff --git a/bin/run_spark.sh b/bin/run_spark.sh
index 057b01747..314fd0d0b 100755
--- a/bin/run_spark.sh
+++ b/bin/run_spark.sh
@@ -17,4 +17,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp examples/spark/target/nemo-examples-spark-0.1-SNAPSHOT-shaded.jar:`yarn classpath` org.apache.nemo.client.JobLauncher "$@"
+java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp examples/spark/target/nemo-examples-spark-$(mvn -q \
+  -Dexec.executable=echo -Dexec.args='${project.version}' \
+  --non-recursive exec:exec)-shaded.jar:`yarn classpath` org.apache.nemo.client.JobLauncher "$@"
diff --git a/client/src/main/java/org/apache/nemo/client/JobLauncher.java b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
index 035d71985..f95bc73fc 100644
--- a/client/src/main/java/org/apache/nemo/client/JobLauncher.java
+++ b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
@@ -101,6 +101,23 @@ private JobLauncher() {
    * @throws Exception exception on the way.
    */
   public static void main(final String[] args) throws Exception {
+    try {
+      setup(args);
+      // Launch client main. The shutdown() method is called inside the launchDAG() method.
+      runUserProgramMain(builtJobConf);
+    } catch (final InjectionException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Set up the driver, etc. before the actual execution.
+   * @param args arguments.
+   * @throws InjectionException injection exception from REEF.
+   * @throws ClassNotFoundException class not found exception.
+   * @throws IOException IO exception.
+   */
+  public static void setup(final String[] args) throws InjectionException, ClassNotFoundException, IOException {
     // Get Job and Driver Confs
     builtJobConf = getJobConf(args);
 
@@ -108,77 +125,76 @@ public static void main(final String[] args) throws Exception {
     LOG.info("Launching RPC Server");
     driverRPCServer = new DriverRPCServer();
     driverRPCServer
-        .registerHandler(ControlMessage.DriverToClientMessageType.DriverStarted, event -> {
-        })
-        .registerHandler(ControlMessage.DriverToClientMessageType.DriverReady, event -> driverReadyLatch.countDown())
-        .registerHandler(ControlMessage.DriverToClientMessageType.ExecutionDone, event -> jobDoneLatch.countDown())
-        .registerHandler(ControlMessage.DriverToClientMessageType.DataCollected, message -> COLLECTED_DATA.addAll(
-            SerializationUtils.deserialize(Base64.getDecoder().decode(message.getDataCollected().getData()))))
-        .run();
+      .registerHandler(ControlMessage.DriverToClientMessageType.DriverStarted, event -> {
+      })
+      .registerHandler(ControlMessage.DriverToClientMessageType.DriverReady, event -> driverReadyLatch.countDown())
+      .registerHandler(ControlMessage.DriverToClientMessageType.ExecutionDone, event -> jobDoneLatch.countDown())
+      .registerHandler(ControlMessage.DriverToClientMessageType.DataCollected, message -> COLLECTED_DATA.addAll(
+        SerializationUtils.deserialize(Base64.getDecoder().decode(message.getDataCollected().getData()))))
+      .run();
 
     final Configuration driverConf = getDriverConf(builtJobConf);
     final Configuration driverNcsConf = getDriverNcsConf();
-    final Configuration driverMessageConfg = getDriverMessageConf();
+    final Configuration driverMessageConfig = getDriverMessageConf();
+    final String defaultExecutorResourceConfig = "[{\"type\":\"Transient\",\"memory_mb\":512,\"capacity\":5},"
+      + "{\"type\":\"Reserved\",\"memory_mb\":512,\"capacity\":5}]";
     final Configuration executorResourceConfig = getJSONConf(builtJobConf, JobConf.ExecutorJSONPath.class,
-        JobConf.ExecutorJSONContents.class);
+      JobConf.ExecutorJSONContents.class, defaultExecutorResourceConfig);
     final Configuration bandwidthConfig = getJSONConf(builtJobConf, JobConf.BandwidthJSONPath.class,
-        JobConf.BandwidthJSONContents.class);
+      JobConf.BandwidthJSONContents.class, "");
     final Configuration clientConf = getClientConf();
     final Configuration schedulerConf = getSchedulerConf(builtJobConf);
 
     // Merge Job and Driver Confs
-    jobAndDriverConf = Configurations.merge(builtJobConf, driverConf, driverNcsConf, driverMessageConfg,
-        executorResourceConfig, bandwidthConfig, driverRPCServer.getListeningConfiguration(), schedulerConf);
+    jobAndDriverConf = Configurations.merge(builtJobConf, driverConf, driverNcsConf, driverMessageConfig,
+      executorResourceConfig, bandwidthConfig, driverRPCServer.getListeningConfiguration(), schedulerConf);
 
     // Get DeployMode Conf
     deployModeConf = Configurations.merge(getDeployModeConf(builtJobConf), clientConf);
 
     // Start Driver and launch user program.
-    try {
-      if (jobAndDriverConf == null || deployModeConf == null || builtJobConf == null) {
-        throw new RuntimeException("Configuration for launching driver is not ready");
-      }
-
-
-      // Launch driver
-      LOG.info("Launching driver");
-      driverReadyLatch = new CountDownLatch(1);
-      driverLauncher = DriverLauncher.getLauncher(deployModeConf);
-      driverLauncher.submit(jobAndDriverConf, 500);
-      // When the driver is up and the resource is ready, the DriverReady message is delivered.
+    if (jobAndDriverConf == null || deployModeConf == null || builtJobConf == null) {
+      throw new RuntimeException("Configuration for launching driver is not ready");
+    }
 
-      // Launch client main
-      runUserProgramMain(builtJobConf);
+    // Launch driver
+    LOG.info("Launching driver");
+    driverReadyLatch = new CountDownLatch(1);
+    driverLauncher = DriverLauncher.getLauncher(deployModeConf);
+    driverLauncher.submit(jobAndDriverConf, 500);
+    // When the driver is up and the resource is ready, the DriverReady message is delivered.
+  }
 
-      // Trigger driver shutdown afterwards
-      driverRPCServer.send(ControlMessage.ClientToDriverMessage.newBuilder()
-          .setType(ControlMessage.ClientToDriverMessageType.DriverShutdown).build());
-      // Wait for driver to naturally finish
-      synchronized (driverLauncher) {
-        while (!driverLauncher.getStatus().isDone()) {
-          try {
-            LOG.info("Wait for the driver to finish");
-            driverLauncher.wait();
-          } catch (final InterruptedException e) {
-            LOG.warn("Interrupted: " + e);
-            // clean up state...
-            Thread.currentThread().interrupt();
-          }
+  /**
+   * Clean up everything.
+   */
+  public static void shutdown() {
+    // Trigger driver shutdown afterwards
+    driverRPCServer.send(ControlMessage.ClientToDriverMessage.newBuilder()
+      .setType(ControlMessage.ClientToDriverMessageType.DriverShutdown).build());
+    // Wait for driver to naturally finish
+    synchronized (driverLauncher) {
+      while (!driverLauncher.getStatus().isDone()) {
+        try {
+          LOG.info("Wait for the driver to finish");
+          driverLauncher.wait();
+        } catch (final InterruptedException e) {
+          LOG.warn("Interrupted: " + e);
+          // clean up state...
+          Thread.currentThread().interrupt();
         }
-        LOG.info("Driver terminated");
-      }
-    } catch (final InjectionException e) {
-      throw new RuntimeException(e);
-    } finally {
-      // Close everything that's left
-      driverRPCServer.shutdown();
-      driverLauncher.close();
-      final Optional<Throwable> possibleError = driverLauncher.getStatus().getError();
-      if (possibleError.isPresent()) {
-        throw new RuntimeException(possibleError.get());
-      } else {
-        LOG.info("Job successfully completed");
       }
+      LOG.info("Driver terminated");
+    }
+
+    // Close everything that's left
+    driverRPCServer.shutdown();
+    driverLauncher.close();
+    final Optional<Throwable> possibleError = driverLauncher.getStatus().getError();
+    if (possibleError.isPresent()) {
+      throw new RuntimeException(possibleError.get());
+    } else {
+      LOG.info("Job successfully completed");
     }
   }
 
@@ -191,14 +207,32 @@ public static void main(final String[] args) throws Exception {
    */
   // When modifying the signature of this method, see CompilerTestUtil#compileDAG and make corresponding changes
   public static void launchDAG(final DAG dag) {
-    launchDAG(dag, Collections.emptyMap());
+    launchDAG(dag, Collections.emptyMap(), "");
+  }
+
+  /**
+   * @param dag the application DAG.
+   * @param jobId job ID.
+   */
+  public static void launchDAG(final DAG dag, final String jobId) {
+    launchDAG(dag, Collections.emptyMap(), jobId);
   }
 
   /**
    * @param dag the application DAG.
    * @param broadcastVariables broadcast variables (can be empty).
+   * @param jobId job ID.
    */
-  public static void launchDAG(final DAG dag, final Map<Serializable, Object> broadcastVariables) {
+  public static void launchDAG(final DAG dag, final Map<Serializable, Object> broadcastVariables, final String jobId) {
+    // launch driver if it hasn't been already
+    if (driverReadyLatch == null) {
+      try {
+        setup(new String[]{"-job_id", jobId});
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
     // Wait until the driver is ready.
     try {
       LOG.info("Waiting for the driver to be ready");
@@ -229,8 +263,11 @@ public static void launchDAG(final DAG dag, final Map<Serializable, Object> broa
       // clean up state...
       Thread.currentThread().interrupt();
       throw new RuntimeException(e);
+    } finally {
+      LOG.info("DAG execution done");
+      // trigger shutdown.
+      shutdown();
     }
-    LOG.info("DAG execution done");
   }
 
   /**
@@ -267,6 +304,13 @@ private static Configuration getClientConf() {
     return jcb.build();
   }
 
+  /**
+   * Fetch scheduler configuration.
+   * @param jobConf job configuration.
+   * @return the scheduler configuration.
+   * @throws ClassNotFoundException exception while finding the class.
+   * @throws InjectionException exception while injection (REEF Tang).
+   */
   private static Configuration getSchedulerConf(final Configuration jobConf)
     throws ClassNotFoundException, InjectionException {
     final Injector injector = TANG.newInjector(jobConf);
@@ -399,13 +443,14 @@ private static Configuration getDeployModeConf(final Configuration jobConf) thro
    */
   private static Configuration getJSONConf(final Configuration jobConf,
                                            final Class<? extends Name<String>> pathParameter,
-                                           final Class<? extends Name<String>> contentsParameter)
+                                           final Class<? extends Name<String>> contentsParameter,
+                                           final String defaultContent)
       throws InjectionException {
     final Injector injector = TANG.newInjector(jobConf);
     try {
       final String path = injector.getNamedInstance(pathParameter);
-      final String contents = path.isEmpty() ? ""
-          : new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8);
+      final String contents = path.isEmpty() ? defaultContent
+        : new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8);
       return TANG.newConfigurationBuilder()
           .bindNamedParameter(contentsParameter, contents)
           .build();
diff --git a/compiler/frontend/beam/pom.xml b/compiler/frontend/beam/pom.xml
index 01000f7fe..02fab3a56 100644
--- a/compiler/frontend/beam/pom.xml
+++ b/compiler/frontend/beam/pom.xml
@@ -29,9 +29,10 @@ under the License.
 
     <artifactId>nemo-compiler-frontend-beam</artifactId>
     <name>Nemo Compiler Frontend: Beam</name>
+    <packaging>jar</packaging>
 
     <dependencies>
-	    <dependency>
+	      <dependency>
             <groupId>org.apache.nemo</groupId>
             <artifactId>nemo-common</artifactId>
             <version>${project.version}</version>
@@ -46,11 +47,11 @@ under the License.
             <artifactId>beam-sdks-java-core</artifactId>
             <version>${beam.version}</version>
         </dependency>
-      <dependency>
+        <dependency>
             <groupId>org.apache.beam</groupId>
             <artifactId>beam-runners-core-java</artifactId>
             <version>${beam.version}</version>
-      </dependency>
+        </dependency>
         <dependency>
             <groupId>org.apache.beam</groupId>
             <artifactId>beam-sdks-java-io-hadoop-input-format</artifactId>
@@ -62,5 +63,11 @@ under the License.
             <version>${hadoop.version}</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>com.google.auto.service</groupId>
+            <artifactId>auto-service</artifactId>
+            <version>${auto-service.version}</version>
+            <optional>true</optional>
+        </dependency>
     </dependencies>
 </project>
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java
index 57f163401..1cfbf7afe 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java
@@ -57,10 +57,7 @@ public State waitUntilFinish(final Duration duration) {
 
   @Override
   public State waitUntilFinish() {
-    throw new UnsupportedOperationException();
-    // TODO #208: NemoPipelineResult#waitUntilFinish hangs
-    // Previous code that hangs the job:
-    // return (State) super.waitUntilJobFinish();
+    return waitUntilFinish(Duration.ZERO);
   }
 
   @Override
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineRunner.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunner.java
similarity index 67%
rename from compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineRunner.java
rename to compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunner.java
index d011d11c8..91282132e 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineRunner.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunner.java
@@ -18,6 +18,7 @@
  */
 package org.apache.nemo.compiler.frontend.beam;
 
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.nemo.client.JobLauncher;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineRunner;
@@ -27,25 +28,46 @@
 /**
  * Runner class for BEAM programs.
  */
-public final class NemoPipelineRunner extends PipelineRunner<NemoPipelineResult> {
+public final class NemoRunner extends PipelineRunner<NemoPipelineResult> {
   private final NemoPipelineOptions nemoPipelineOptions;
 
   /**
    * BEAM Pipeline Runner.
    * @param nemoPipelineOptions PipelineOptions.
    */
-  private NemoPipelineRunner(final NemoPipelineOptions nemoPipelineOptions) {
+  private NemoRunner(final NemoPipelineOptions nemoPipelineOptions) {
     this.nemoPipelineOptions = nemoPipelineOptions;
   }
 
+  /**
+   * Creates and returns a new NemoRunner with default options.
+   *
+   * @return A pipeline runner with default options.
+   */
+  public static NemoRunner create() {
+    NemoPipelineOptions options = PipelineOptionsFactory.as(NemoPipelineOptions.class);
+    options.setRunner(NemoRunner.class);
+    return new NemoRunner(options);
+  }
+
+  /**
+   * Creates and returns a new NemoRunner with specified options.
+   *
+   * @param options The NemoPipelineOptions to use when executing the job.
+   * @return A pipeline runner that will execute with specified options.
+   */
+  public static NemoRunner create(final NemoPipelineOptions options) {
+    return new NemoRunner(options);
+  }
+
   /**
    * Static initializer for creating PipelineRunner with the given options.
    * @param options given PipelineOptions.
    * @return The created PipelineRunner.
    */
-  public static PipelineRunner<NemoPipelineResult> fromOptions(final PipelineOptions options) {
+  public static NemoRunner fromOptions(final PipelineOptions options) {
     final NemoPipelineOptions nemoOptions = PipelineOptionsValidator.validate(NemoPipelineOptions.class, options);
-    return new NemoPipelineRunner(nemoOptions);
+    return new NemoRunner(nemoOptions);
   }
 
   /**
@@ -57,7 +79,7 @@ public NemoPipelineResult run(final Pipeline pipeline) {
     final PipelineVisitor pipelineVisitor = new PipelineVisitor(pipeline, nemoPipelineOptions);
     pipeline.traverseTopologically(pipelineVisitor);
     final NemoPipelineResult nemoPipelineResult = new NemoPipelineResult();
-    JobLauncher.launchDAG(pipelineVisitor.getConvertedPipeline());
+    JobLauncher.launchDAG(pipelineVisitor.getConvertedPipeline(), nemoPipelineOptions.getJobName());
     return nemoPipelineResult;
   }
 }
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunnerRegistrar.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunnerRegistrar.java
new file mode 100644
index 000000000..aa05519f8
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunnerRegistrar.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+/**
+ * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the {@link NemoRunner}.
+ *
+ * {@link AutoService} will register Nemo's implementations of the {@link PipelineRunner} and {@link PipelineOptions}
+ * as available pipeline runner services.
+ */
+public final class NemoRunnerRegistrar {
+  /**
+   * Private constructor.
+   */
+  private NemoRunnerRegistrar() {
+  }
+
+  /**
+   * Registers the {@link NemoRunner}.
+   */
+  @AutoService(PipelineRunnerRegistrar.class)
+  public static final class Runner implements PipelineRunnerRegistrar {
+    @Override
+    public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
+      return ImmutableList.of(NemoRunner.class);
+    }
+  }
+
+  /**
+   * Registers the {@link NemoPipelineOptions}.
+   */
+  @AutoService(PipelineOptionsRegistrar.class)
+  public static final class Options implements PipelineOptionsRegistrar {
+    @Override
+    public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+      return ImmutableList.of(NemoPipelineOptions.class);
+    }
+  }
+}
diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
index 3cb755511..0a5c0e504 100644
--- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
+++ b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
@@ -105,7 +105,7 @@ public static Serializer deriveSerializerFrom(final org.apache.spark.SparkContex
     builder.connectVertices(newEdge);
 
     // launch DAG
-    JobLauncher.launchDAG(builder.build(), SparkBroadcastVariables.getAll());
+    JobLauncher.launchDAG(builder.build(), SparkBroadcastVariables.getAll(), "");
 
     return (List<T>) JobLauncher.getCollectedData();
   }
diff --git a/compiler/frontend/spark/src/main/scala/org/apache/nemo/compiler/frontend/spark/core/rdd/RDD.scala b/compiler/frontend/spark/src/main/scala/org/apache/nemo/compiler/frontend/spark/core/rdd/RDD.scala
index 3cf516a0b..234e86784 100644
--- a/compiler/frontend/spark/src/main/scala/org/apache/nemo/compiler/frontend/spark/core/rdd/RDD.scala
+++ b/compiler/frontend/spark/src/main/scala/org/apache/nemo/compiler/frontend/spark/core/rdd/RDD.scala
@@ -230,7 +230,7 @@ final class RDD[T: ClassTag] protected[rdd] (
     newEdge.setProperty(keyExtractorProperty)
 
     builder.connectVertices(newEdge)
-    JobLauncher.launchDAG(builder.build, SparkBroadcastVariables.getAll)
+    JobLauncher.launchDAG(builder.build, SparkBroadcastVariables.getAll, "")
   }
 
   /////////////// CACHING ///////////////
diff --git a/compiler/test/src/main/java/org/apache/nemo/compiler/CompilerTestUtil.java b/compiler/test/src/main/java/org/apache/nemo/compiler/CompilerTestUtil.java
index 2a4d35987..d948138c4 100644
--- a/compiler/test/src/main/java/org/apache/nemo/compiler/CompilerTestUtil.java
+++ b/compiler/test/src/main/java/org/apache/nemo/compiler/CompilerTestUtil.java
@@ -86,8 +86,9 @@ private static String findRoot(final String curDir) {
     final Method userMainMethod = userMainClass.getMethod("main", String[].class);
 
     final ArgumentCaptor<DAG> captor = ArgumentCaptor.forClass(DAG.class);
+    final ArgumentCaptor<String> stringArg = ArgumentCaptor.forClass(String.class);
     PowerMockito.mockStatic(JobLauncher.class);
-    PowerMockito.doNothing().when(JobLauncher.class, "launchDAG", captor.capture());
+    PowerMockito.doNothing().when(JobLauncher.class, "launchDAG", captor.capture(), stringArg.capture());
     userMainMethod.invoke(null, (Object) userMainMethodArgs);
     return captor.getValue();
   }
diff --git a/conf/src/main/java/org/apache/nemo/conf/JobConf.java b/conf/src/main/java/org/apache/nemo/conf/JobConf.java
index 7de0e18aa..1bdb7c9e0 100644
--- a/conf/src/main/java/org/apache/nemo/conf/JobConf.java
+++ b/conf/src/main/java/org/apache/nemo/conf/JobConf.java
@@ -165,7 +165,7 @@
    * Path to the JSON file that specifies resource layout.
    */
   @NamedParameter(doc = "Path to the JSON file that specifies resources for executors", short_name = "executor_json",
-      default_value = "examples/resources/test_executor_resources.json")
+      default_value = "")
   public final class ExecutorJSONPath implements Name<String> {
   }
 
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
index 76bdc03ae..f3ce915a6 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
@@ -20,7 +20,7 @@
 
 import com.github.fommil.netlib.BLAS;
 import com.github.fommil.netlib.LAPACK;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CoderProviders;
@@ -371,7 +371,7 @@ public static void main(final String[] args) {
     }
 
     final PipelineOptions options = PipelineOptionsFactory.create();
-    options.setRunner(NemoPipelineRunner.class);
+    options.setRunner(NemoRunner.class);
     options.setJobName("ALS");
     options.setStableUniqueNames(PipelineOptions.CheckEnabled.OFF);
 
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
index ab1760fb5..5af8dd1e6 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
@@ -19,7 +19,7 @@
 package org.apache.nemo.examples.beam;
 
 import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CoderProviders;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -116,7 +116,7 @@ public static void main(final String[] args) {
     }
 
     final PipelineOptions options = PipelineOptionsFactory.create();
-    options.setRunner(NemoPipelineRunner.class);
+    options.setRunner(NemoRunner.class);
     options.setJobName("ALS");
     options.setStableUniqueNames(PipelineOptions.CheckEnabled.OFF);
 
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/Broadcast.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/Broadcast.java
index da609e40a..5ded58b80 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/Broadcast.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/Broadcast.java
@@ -18,7 +18,7 @@
  */
 package org.apache.nemo.examples.beam;
 
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -49,7 +49,7 @@ public static void main(final String[] args) {
     final String inputFilePath = args[0];
     final String outputFilePath = args[1];
     final PipelineOptions options = PipelineOptionsFactory.create();
-    options.setRunner(NemoPipelineRunner.class);
+    options.setRunner(NemoRunner.class);
 
     final Pipeline p = Pipeline.create(options);
     final PCollection<String> elemCollection = GenericSourceSink.read(p, inputFilePath);
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MinimalWordCount.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MinimalWordCount.java
new file mode 100644
index 000000000..5549d10f7
--- /dev/null
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MinimalWordCount.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.examples.beam;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.FlatMapElements;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
+
+import java.util.Arrays;
+/**
+ * MinimalWordCount program from BEAM.
+ */
+public final class MinimalWordCount {
+  /**
+   * Private Constructor.
+   */
+  private MinimalWordCount() {
+  }
+  /**
+   * Main function for the MinimalWordCount Beam program.
+   * @param args arguments.
+   */
+  public static void main(final String[] args) {
+    final String inputFilePath = args[0];
+    final String outputFilePath = args[1];
+    final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
+    options.setRunner(NemoRunner.class);
+    options.setJobName("MinimalWordCount");
+    // Create the Pipeline object with the options we defined above
+    final Pipeline p = Pipeline.create(options);
+    // Concept #1: Apply a root transform to the pipeline; in this case, TextIO.Read to read a set
+    // of input text files. TextIO.Read returns a PCollection where each element is one line from
+    // the input text (a set of Shakespeare's texts).
+    // This example reads a public data set consisting of the complete works of Shakespeare.
+    p.apply(TextIO.read().from(inputFilePath))
+      // Concept #2: Apply a FlatMapElements transform the PCollection of text lines.
+      // This transform splits the lines in PCollection<String>, where each element is an
+      // individual word in Shakespeare's collected texts.
+      .apply(
+        FlatMapElements.into(TypeDescriptors.strings())
+          .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
+      // We use a Filter transform to avoid empty word
+      .apply(Filter.by((String word) -> !word.isEmpty()))
+      // Concept #3: Apply the Count transform to our PCollection of individual words. The Count
+      // transform returns a new PCollection of key/value pairs, where each key represents a
+      // unique word in the text. The associated value is the occurrence count for that word.
+      .apply(Count.perElement())
+      // Apply a MapElements transform that formats our PCollection of word counts into a
+      // printable string, suitable for writing to an output file.
+      .apply(
+        MapElements.into(TypeDescriptors.strings())
+          .via(
+            (KV<String, Long> wordCount) ->
+              wordCount.getKey() + ": " + wordCount.getValue()))
+      // Concept #4: Apply a write transform, TextIO.Write, at the end of the pipeline.
+      // TextIO.Write writes the contents of a PCollection (in this case, our PCollection of
+      // formatted strings) to a series of text files.
+      //
+      // By default, it will write to a set of files with names like wordcounts-00001-of-00005
+      .apply(TextIO.write().to(outputFilePath));
+    p.run();
+  }
+}
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
index 921b86252..2b37eba84 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
@@ -19,7 +19,7 @@
 package org.apache.nemo.examples.beam;
 
 import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.apache.nemo.common.Pair;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -420,7 +420,7 @@ public static void main(final String[] args) {
     }
 
     final PipelineOptions options = PipelineOptionsFactory.create();
-    options.setRunner(NemoPipelineRunner.class);
+    options.setRunner(NemoRunner.class);
     options.setJobName("MLR");
     options.setStableUniqueNames(PipelineOptions.CheckEnabled.OFF);
 
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/NetworkTraceAnalysis.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/NetworkTraceAnalysis.java
index e0567c6ef..a9bbc4350 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/NetworkTraceAnalysis.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/NetworkTraceAnalysis.java
@@ -19,7 +19,7 @@
 package org.apache.nemo.examples.beam;
 
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -59,7 +59,7 @@ public static void main(final String[] args) {
     final String input1FilePath = args[1];
     final String outputFilePath = args[2];
     final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
-    options.setRunner(NemoPipelineRunner.class);
+    options.setRunner(NemoRunner.class);
     options.setJobName("NetworkTraceAnalysis");
 
     // Given "4 0.0 192.168.3.1 -> 192.168.0.2 Len=29", this finds "192.168.3.1", "192.168.0.2" and "29"
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PartitionWordsByLength.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PartitionWordsByLength.java
index b446858c5..816be62bb 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PartitionWordsByLength.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PartitionWordsByLength.java
@@ -19,7 +19,7 @@
 package org.apache.nemo.examples.beam;
 
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -47,7 +47,7 @@ public static void main(final String[] args) {
     final String inputFilePath = args[0];
     final String outputFilePath = args[1];
     final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
-    options.setRunner(NemoPipelineRunner.class);
+    options.setRunner(NemoRunner.class);
     options.setJobName("PartitionWordsByLength");
 
     // {} here is required for preserving type information.
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerKeyMedian.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerKeyMedian.java
index ba2a94e6a..6255a9333 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerKeyMedian.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerKeyMedian.java
@@ -19,7 +19,7 @@
 package org.apache.nemo.examples.beam;
 
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -53,7 +53,7 @@ public static void main(final String[] args) {
     final String inputFilePath = args[0];
     final String outputFilePath = args[1];
     final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
-    options.setRunner(NemoPipelineRunner.class);
+    options.setRunner(NemoRunner.class);
     options.setJobName("PerKeyMedian");
 
     final Pipeline p = Pipeline.create(options);
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerPercentileAverage.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerPercentileAverage.java
index 39b941ab9..486442bf8 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerPercentileAverage.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerPercentileAverage.java
@@ -20,7 +20,7 @@
 
 import com.google.common.collect.Lists;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -52,7 +52,7 @@ public static void main(final String[] args) {
     final String inputFilePath = args[0];
     final String outputFilePath = args[1];
     final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
-    options.setRunner(NemoPipelineRunner.class);
+    options.setRunner(NemoRunner.class);
     options.setJobName("PerPercentileAverage");
 
     final Pipeline p = Pipeline.create(options);
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
index b476460a3..6fa8f5ee7 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
@@ -26,7 +26,7 @@
 import org.apache.beam.sdk.transforms.*;
 import org.apache.beam.sdk.values.*;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 
 import java.util.List;
 import java.util.stream.Collectors;
@@ -50,7 +50,7 @@ public static void main(final String[] args) {
     final String outputFilePath = args[0];
 
     final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
-    options.setRunner(NemoPipelineRunner.class);
+    options.setRunner(NemoRunner.class);
     options.setJobName("SimpleSumSQL");
     final Pipeline p = Pipeline.create(options);
 
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
index 30ee405cd..5bff5d8ec 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
@@ -28,7 +28,7 @@
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
@@ -62,7 +62,7 @@ public static void main(final String[] args) {
       .every(Duration.standardSeconds(1)));
 
     final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
-    options.setRunner(NemoPipelineRunner.class);
+    options.setRunner(NemoRunner.class);
     options.setJobName("WindowedBroadcast");
 
     final Pipeline p = Pipeline.create(options);
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
index 0f13dc465..a814165e0 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
@@ -29,7 +29,7 @@
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
@@ -106,7 +106,7 @@ public static void main(final String[] args) {
     }
 
     final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
-    options.setRunner(NemoPipelineRunner.class);
+    options.setRunner(NemoRunner.class);
     options.setJobName("WindowedWordCount");
 
     final Pipeline p = Pipeline.create(options);
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCount.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCount.java
index 9af9d7c8d..ba3cb8002 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCount.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCount.java
@@ -19,7 +19,7 @@
 package org.apache.nemo.examples.beam;
 
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -45,7 +45,7 @@ public static void main(final String[] args) {
     final String inputFilePath = args[0];
     final String outputFilePath = args[1];
     final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
-    options.setRunner(NemoPipelineRunner.class);
+    options.setRunner(NemoRunner.class);
     options.setJobName("WordCount");
 
     final Pipeline p = Pipeline.create(options);
diff --git a/pom.xml b/pom.xml
index 7ed51fe03..132c5b043 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,6 +43,7 @@ under the License.
         <maven.compiler.source>1.8</maven.compiler.source>
         <maven.compiler.target>1.8</maven.compiler.target>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <auto-service.version>1.0-rc2</auto-service.version>
         <beam.version>2.6.0</beam.version>
         <spark.version>2.2.0</spark.version>
         <scala.version>2.11.8</scala.version>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services