You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/07/10 07:13:08 UTC

[incubator-nemo] branch master updated: [NEMO-71] Add NodeNamesAssignmentPass and Example Application (#62)

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 64a0912  [NEMO-71] Add NodeNamesAssignmentPass and Example Application (#62)
64a0912 is described below

commit 64a09120e28e0817842d1861bed6e9289b8cd348
Author: Jangho Seo <ja...@jangho.io>
AuthorDate: Tue Jul 10 16:13:04 2018 +0900

    [NEMO-71] Add NodeNamesAssignmentPass and Example Application (#62)
    
    JIRA: [NEMO-71: Add LocationShareAssignmentPass and Example Application](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-71)
    
    **Major changes:**
    - Added NodeNamesAssignmentPass, which computes and assigns appropriate share of nodes to each vertex.
    - Add join example (NetworkTraceAnalysis).
    
    **Minor changes to note:**
    - N/A
    
    **Tests for the changes:**
    - Added integration test (NetworkTraceAnalysisITCase)
    
    **Other comments:**
    - N/A
    
    resolves [NEMO-71](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-71)
---
 .../main/java/edu/snu/nemo/client/JobLauncher.java |  31 ++-
 .../executionproperty/NodeNamesProperty.java       |  42 ++++
 compiler/optimizer/pom.xml                         |  15 ++
 .../annotating/NodeNamesAssignmentPass.java        | 255 +++++++++++++++++++++
 .../composite/PrimitiveCompositePass.java          |   1 +
 conf/src/main/java/edu/snu/nemo/conf/JobConf.java  |  19 +-
 .../nemo/examples/beam/NetworkTraceAnalysis.java   | 138 +++++++++++
 .../examples/beam/NetworkTraceAnalysisITCase.java  |  87 +++++++
 examples/resources/sample_input_network0           |   8 +
 examples/resources/sample_input_network1           |   8 +
 examples/resources/test_output_network             |   2 +
 pom.xml                                            |   1 +
 .../main/java/edu/snu/nemo/driver/NemoDriver.java  |   6 +-
 .../scheduler/NodeShareSchedulingConstraint.java   |  61 +++++
 .../scheduler/SchedulingConstraintRegistry.java    |   4 +-
 .../optimizer/policy/PolicyBuilderTest.java        |   6 +-
 16 files changed, 667 insertions(+), 17 deletions(-)

diff --git a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
index b7cc03e..6826174 100644
--- a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
+++ b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
@@ -33,6 +33,7 @@ import org.apache.reef.io.network.util.StringIdentifierFactory;
 import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
 import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
 import org.apache.reef.tang.*;
+import org.apache.reef.tang.annotations.Name;
 import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.tang.formats.CommandLine;
 import org.apache.reef.util.EnvironmentUtils;
@@ -95,12 +96,15 @@ public final class JobLauncher {
     final Configuration driverConf = getDriverConf(builtJobConf);
     final Configuration driverNcsConf = getDriverNcsConf();
     final Configuration driverMessageConfg = getDriverMessageConf();
-    final Configuration executorResourceConfig = getExecutorResourceConf(builtJobConf);
+    final Configuration executorResourceConfig = getJSONConf(builtJobConf, JobConf.ExecutorJSONPath.class,
+        JobConf.ExecutorJSONContents.class);
+    final Configuration bandwidthConfig = getJSONConf(builtJobConf, JobConf.BandwidthJSONPath.class,
+        JobConf.BandwidthJSONContents.class);
     final Configuration clientConf = getClientConf();
 
     // Merge Job and Driver Confs
     jobAndDriverConf = Configurations.merge(builtJobConf, driverConf, driverNcsConf, driverMessageConfg,
-        executorResourceConfig, driverRPCServer.getListeningConfiguration());
+        executorResourceConfig, bandwidthConfig, driverRPCServer.getListeningConfiguration());
 
     // Get DeployMode Conf
     deployModeConf = Configurations.merge(getDeployModeConf(builtJobConf), clientConf);
@@ -237,7 +241,8 @@ public final class JobLauncher {
     cl.registerShortNameOfClass(JobConf.OptimizationPolicy.class);
     cl.registerShortNameOfClass(JobConf.DeployMode.class);
     cl.registerShortNameOfClass(JobConf.DriverMemMb.class);
-    cl.registerShortNameOfClass(JobConf.ExecutorJsonPath.class);
+    cl.registerShortNameOfClass(JobConf.ExecutorJSONPath.class);
+    cl.registerShortNameOfClass(JobConf.BandwidthJSONPath.class);
     cl.registerShortNameOfClass(JobConf.JVMHeapSlack.class);
     cl.registerShortNameOfClass(JobConf.IORequestHandleThreadsTotal.class);
     cl.registerShortNameOfClass(JobConf.MaxScheduleAttempt.class);
@@ -277,19 +282,25 @@ public final class JobLauncher {
   }
 
   /**
-   * Get executor resource configuration.
+   * Read json file and return its contents as configuration parameter.
    *
-   * @param jobConf job configuration to get executor json path.
-   * @return executor resource configuration.
+   * @param jobConf job configuration to get json path.
+   * @param pathParameter named parameter represents path to the json file, or an empty string
+   * @param contentsParameter named parameter represents contents of the file
+   * @return configuration with contents of the file, or an empty string as value for {@code contentsParameter}
    * @throws InjectionException exception while injection.
    */
-  private static Configuration getExecutorResourceConf(final Configuration jobConf) throws InjectionException {
+  private static Configuration getJSONConf(final Configuration jobConf,
+                                           final Class<? extends Name<String>> pathParameter,
+                                           final Class<? extends Name<String>> contentsParameter)
+      throws InjectionException {
     final Injector injector = TANG.newInjector(jobConf);
     try {
-      final String path = injector.getNamedInstance(JobConf.ExecutorJsonPath.class);
-      final String contents = new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8);
+      final String path = injector.getNamedInstance(pathParameter);
+      final String contents = path.isEmpty() ? ""
+          : new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8);
       return TANG.newConfigurationBuilder()
-          .bindNamedParameter(JobConf.ExecutorJsonContents.class, contents)
+          .bindNamedParameter(contentsParameter, contents)
           .build();
     } catch (final IOException e) {
       throw new RuntimeException(e);
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/NodeNamesProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/NodeNamesProperty.java
new file mode 100644
index 0000000..82cddd9
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/NodeNamesProperty.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.vertex.executionproperty;
+
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+
+import java.util.HashMap;
+
+/**
+ * Map between node name and the number of parallelism which will run on the node.
+ */
+public final class NodeNamesProperty extends VertexExecutionProperty<HashMap<String, Integer>> {
+    /**
+     * Default constructor.
+     * @param value the map from location to the number of Task that must be executed on the node
+     */
+    public NodeNamesProperty(final HashMap<String, Integer> value) {
+        super(value);
+    }
+
+    /**
+     * Static method for constructing {@link NodeNamesProperty}.
+     * @param value the map from location to the number of Task that must be executed on the node
+     * @return the execution property
+     */
+    public static NodeNamesProperty of(final HashMap<String, Integer> value) {
+        return new NodeNamesProperty(value);
+    }
+}
diff --git a/compiler/optimizer/pom.xml b/compiler/optimizer/pom.xml
index 1c65254..8195526 100644
--- a/compiler/optimizer/pom.xml
+++ b/compiler/optimizer/pom.xml
@@ -37,6 +37,11 @@ limitations under the License.
 
     <dependencies>
         <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-math3</artifactId>
+            <version>${commons-math.version}</version>
+        </dependency>
+        <dependency>
             <groupId>edu.snu.nemo</groupId>
             <artifactId>nemo-common</artifactId>
             <version>${project.version}</version>
@@ -46,5 +51,15 @@ limitations under the License.
             <artifactId>nemo-runtime-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
     </dependencies>
 </project>
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/NodeNamesAssignmentPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/NodeNamesAssignmentPass.java
new file mode 100644
index 0000000..8c6d8a1
--- /dev/null
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/NodeNamesAssignmentPass.java
@@ -0,0 +1,255 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import com.fasterxml.jackson.core.TreeNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.NodeNamesProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.apache.commons.math3.optim.BaseOptimizer;
+import org.apache.commons.math3.optim.PointValuePair;
+import org.apache.commons.math3.optim.linear.*;
+import org.apache.commons.math3.optim.nonlinear.scalar.GoalType;
+import org.apache.commons.math3.util.Incrementor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.*;
+
+/**
+ * Computes and assigns appropriate share of nodes to each irVertex to minimize shuffle time,
+ * with respect to bandwidth restrictions of nodes. If bandwidth information is not given, this pass does nothing.
+ * This pass follows task assignment of Iridium-style optimization.
+ * http://pages.cs.wisc.edu/~akella/papers/gda-sigcomm15.pdf
+ *
+ * <h3>Assumptions</h3>
+ * This pass assumes no skew in input or intermediate data, so that the number of Task assigned to a node
+ * is proportional to the data size handled by the node.
+ * Also, this pass assumes stages with empty map as {@link NodeNamesProperty} are assigned to nodes evenly.
+ * For example, if source splits are not distributed evenly, any source location-aware scheduling policy will
+ * assign TaskGroups unevenly.
+ * Also, this pass assumes network bandwidth to be the bottleneck. Each node should have enough capacity to run
+ * TaskGroups immediately as scheduler attempts to schedule a TaskGroup.
+ */
+public final class NodeNamesAssignmentPass extends AnnotatingPass {
+
+  // Index of the objective parameter, in the coefficient vector
+  private static final int OBJECTIVE_COEFFICIENT_INDEX = 0;
+  private static final Logger LOG = LoggerFactory.getLogger(NodeNamesAssignmentPass.class);
+  private static final HashMap<String, Integer> EMPTY_MAP = new HashMap<>();
+
+  private static String bandwidthSpecificationString = "";
+
+
+  /**
+   * Default constructor.
+   */
+  public NodeNamesAssignmentPass() {
+    super(NodeNamesProperty.class, Collections.singleton(ParallelismProperty.class));
+  }
+
+  @Override
+  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+    if (bandwidthSpecificationString.isEmpty()) {
+      dag.topologicalDo(irVertex -> irVertex.setProperty(NodeNamesProperty.of(EMPTY_MAP)));
+    } else {
+      assignNodeShares(dag, BandwidthSpecification.fromJsonString(bandwidthSpecificationString));
+    }
+    return dag;
+  }
+
+  public static void setBandwidthSpecificationString(final String value) {
+    bandwidthSpecificationString = value;
+  }
+
+  private static void assignNodeShares(
+      final DAG<IRVertex, IREdge> dag,
+      final BandwidthSpecification bandwidthSpecification) {
+    dag.topologicalDo(irVertex -> {
+      final Collection<IREdge> inEdges = dag.getIncomingEdgesOf(irVertex);
+      final int parallelism = irVertex.getPropertyValue(ParallelismProperty.class)
+          .orElseThrow(() -> new RuntimeException("Parallelism property required"));
+      if (inEdges.size() == 0) {
+        // The stage is root stage.
+        // Fall back to setting even distribution
+        final HashMap<String, Integer> shares = new HashMap<>();
+        final List<String> nodes = bandwidthSpecification.getNodes();
+        final int defaultShare = parallelism / nodes.size();
+        final int remainder = parallelism % nodes.size();
+        for (int i = 0; i < nodes.size(); i++) {
+          shares.put(nodes.get(i), defaultShare + (i < remainder ? 1 : 0));
+        }
+        irVertex.getExecutionProperties().put(NodeNamesProperty.of(shares));
+      } else if (isOneToOneEdge(inEdges)) {
+        final Optional<NodeNamesProperty> property = dag.getIncomingEdgesOf(irVertex).iterator().next()
+            .getExecutionProperties().get(NodeNamesProperty.class);
+        irVertex.getExecutionProperties().put(property.get());
+      } else {
+        // This IRVertex has shuffle inEdge(s), or has multiple inEdges.
+        final Map<String, Integer> parentLocationShares = new HashMap<>();
+        for (final IREdge edgeToIRVertex : dag.getIncomingEdgesOf(irVertex)) {
+          final IRVertex parentVertex = edgeToIRVertex.getSrc();
+          final Map<String, Integer> shares = parentVertex.getPropertyValue(NodeNamesProperty.class).get();
+          for (final Map.Entry<String, Integer> element : shares.entrySet()) {
+            parentLocationShares.putIfAbsent(element.getKey(), 0);
+            parentLocationShares.put(element.getKey(),
+                element.getValue() + parentLocationShares.get(element.getKey()));
+          }
+        }
+        final double[] ratios = optimize(bandwidthSpecification, parentLocationShares);
+        final HashMap<String, Integer> shares = new HashMap<>();
+        for (int i = 0; i < bandwidthSpecification.getNodes().size(); i++) {
+          shares.put(bandwidthSpecification.getNodes().get(i), (int) (ratios[i] * parallelism));
+        }
+        int remainder = parallelism - shares.values().stream().mapToInt(i -> i).sum();
+        for (final String nodeName : shares.keySet()) {
+          if (remainder == 0) {
+            break;
+          }
+          shares.put(nodeName, shares.get(nodeName) + 1);
+          remainder--;
+        }
+        irVertex.getExecutionProperties().put(NodeNamesProperty.of(shares));
+      }
+    });
+  }
+
+  /**
+   * @param inEdges list of inEdges to the specific irVertex
+   * @return true if and only if the irVertex has one OneToOne edge
+   */
+  private static boolean isOneToOneEdge(final Collection<IREdge> inEdges) {
+    return inEdges.size() == 1 && inEdges.iterator().next()
+          .getPropertyValue(DataCommunicationPatternProperty.class).get()
+          .equals(DataCommunicationPatternProperty.Value.OneToOne);
+  }
+
+  /**
+   * Computes share of parallelism that each node is responsible for.
+   * @param bandwidthSpecification provides bandwidth information between nodes
+   * @param parentNodeShares shares of parallelism for the parent vertex
+   * @return array of fractions of parallelism that each node is responsible for
+   */
+  private static double[] optimize(final BandwidthSpecification bandwidthSpecification,
+                                   final Map<String, Integer> parentNodeShares) {
+    final int parentParallelism = parentNodeShares.values().stream().mapToInt(i -> i).sum();
+    final List<String> nodeNames = bandwidthSpecification.getNodes();
+    final List<LinearConstraint> constraints = new ArrayList<>();
+    final int coefficientVectorSize = nodeNames.size() + 1;
+
+    for (int i = 0; i < nodeNames.size(); i++) {
+      final String nodeName = nodeNames.get(i);
+      final int nodeCoefficientIndex = i + 1;
+      final int parentParallelismOnThisLocation = parentNodeShares.get(nodeName);
+
+      // Upload bandwidth
+      final double[] uploadCoefficientVector = new double[coefficientVectorSize];
+      uploadCoefficientVector[OBJECTIVE_COEFFICIENT_INDEX] = bandwidthSpecification.up(nodeName);
+      uploadCoefficientVector[nodeCoefficientIndex] = parentParallelismOnThisLocation;
+      constraints.add(new LinearConstraint(uploadCoefficientVector, Relationship.GEQ,
+          parentParallelismOnThisLocation));
+
+      // Download bandwidth
+      final double[] downloadCoefficientVector = new double[coefficientVectorSize];
+      downloadCoefficientVector[OBJECTIVE_COEFFICIENT_INDEX] = bandwidthSpecification.down(nodeName);
+      downloadCoefficientVector[nodeCoefficientIndex] = parentParallelismOnThisLocation - parentParallelism;
+      constraints.add(new LinearConstraint(downloadCoefficientVector, Relationship.GEQ, 0));
+
+      // The coefficient is non-negative
+      final double[] nonNegativeCoefficientVector = new double[coefficientVectorSize];
+      nonNegativeCoefficientVector[nodeCoefficientIndex] = 1;
+      constraints.add(new LinearConstraint(nonNegativeCoefficientVector, Relationship.GEQ, 0));
+    }
+
+    // The sum of all coefficient is 1
+    final double[] sumCoefficientVector = new double[coefficientVectorSize];
+    for (int i = 0; i < nodeNames.size(); i++) {
+      sumCoefficientVector[OBJECTIVE_COEFFICIENT_INDEX + 1 + i] = 1;
+    }
+    constraints.add(new LinearConstraint(sumCoefficientVector, Relationship.EQ, 1));
+
+    // Objective
+    final double[] objectiveCoefficientVector = new double[coefficientVectorSize];
+    objectiveCoefficientVector[OBJECTIVE_COEFFICIENT_INDEX] = 1;
+    final LinearObjectiveFunction objectiveFunction = new LinearObjectiveFunction(objectiveCoefficientVector, 0);
+
+    // Solve
+    try {
+      final SimplexSolver solver = new SimplexSolver();
+      final Field iterations = BaseOptimizer.class.getDeclaredField("iterations");
+      iterations.setAccessible(true);
+      final Incrementor incrementor = (Incrementor) iterations.get(solver);
+      incrementor.setMaximalCount(2147483647);
+      LOG.info(String.format("Max iterations: %d", solver.getMaxIterations()));
+      final PointValuePair solved = solver.optimize(
+          new LinearConstraintSet(constraints), objectiveFunction, GoalType.MINIMIZE);
+
+      return Arrays.copyOfRange(solved.getPoint(), OBJECTIVE_COEFFICIENT_INDEX + 1, coefficientVectorSize);
+    } catch (final NoSuchFieldException | IllegalAccessException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Bandwidth specification.
+   */
+  private static final class BandwidthSpecification {
+    private final List<String> nodeNames = new ArrayList<>();
+    private final Map<String, Integer> uplinkBandwidth = new HashMap<>();
+    private final Map<String, Integer> downlinkBandwidth = new HashMap<>();
+
+    private BandwidthSpecification() {
+    }
+
+    static BandwidthSpecification fromJsonString(final String jsonString) {
+      final BandwidthSpecification specification = new BandwidthSpecification();
+      try {
+        final ObjectMapper objectMapper = new ObjectMapper();
+        final TreeNode jsonRootNode = objectMapper.readTree(jsonString);
+        for (int i = 0; i < jsonRootNode.size(); i++) {
+          final TreeNode locationNode = jsonRootNode.get(i);
+          final String name = locationNode.get("name").traverse().nextTextValue();
+          final int up = locationNode.get("up").traverse().getIntValue();
+          final int down = locationNode.get("down").traverse().getIntValue();
+          specification.nodeNames.add(name);
+          specification.uplinkBandwidth.put(name, up);
+          specification.downlinkBandwidth.put(name, down);
+        }
+      } catch (final IOException e) {
+        throw new RuntimeException(e);
+      }
+      return specification;
+    }
+
+    int up(final String nodeName) {
+      return uplinkBandwidth.get(nodeName);
+    }
+
+    int down(final String nodeName) {
+      return downlinkBandwidth.get(nodeName);
+    }
+
+    List<String> getNodes() {
+      return nodeNames;
+    }
+  }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
index a30abec..dc8abfc 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
@@ -39,6 +39,7 @@ public final class PrimitiveCompositePass extends CompositePass {
         new CompressionPass(),
         new DecompressionPass(),
         new SourceLocationAwareSchedulingPass(),
+        new NodeNamesAssignmentPass(),
         new ExecutorSlotCompliancePass()
     ));
   }
diff --git a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
index 1e4ef4d..bed8099 100644
--- a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
+++ b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
@@ -116,11 +116,26 @@ public final class JobConf extends ConfigurationModuleBuilder {
   }
 
   /**
+   * Path to the JSON file that specifies bandwidth between locations.
+   */
+  @NamedParameter(doc = "Path to the JSON file that specifies bandwidth between locations",
+      short_name = "bandwidth_json", default_value = "")
+  public final class BandwidthJSONPath implements Name<String> {
+  }
+
+  /**
    * Path to the JSON file that specifies resource layout.
    */
   @NamedParameter(doc = "Path to the JSON file that specifies resources for executors", short_name = "executor_json",
       default_value = "examples/resources/sample_executor_resources.json")
-  public final class ExecutorJsonPath implements Name<String> {
+  public final class ExecutorJSONPath implements Name<String> {
+  }
+
+  /**
+   * Contents of the JSON file that specifies bandwidth between locations.
+   */
+  @NamedParameter(doc = "Contents of JSON file that specifies bandwidth between locations")
+  public final class BandwidthJSONContents implements Name<String> {
   }
 
   /**
@@ -135,7 +150,7 @@ public final class JobConf extends ConfigurationModuleBuilder {
    * Contents of the JSON file that specifies resource layout.
    */
   @NamedParameter(doc = "Contents of JSON file that specifies resources for executors")
-  public final class ExecutorJsonContents implements Name<String> {
+  public final class ExecutorJSONContents implements Name<String> {
   }
 
   /**
diff --git a/examples/beam/src/main/java/edu/snu/nemo/examples/beam/NetworkTraceAnalysis.java b/examples/beam/src/main/java/edu/snu/nemo/examples/beam/NetworkTraceAnalysis.java
new file mode 100644
index 0000000..1f2f990
--- /dev/null
+++ b/examples/beam/src/main/java/edu/snu/nemo/examples/beam/NetworkTraceAnalysis.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.examples.beam;
+
+import edu.snu.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import edu.snu.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * An app that analyzes data flow from network trace.
+ * Each line in the output file represents a host, containing the standard deviation of the lengths of packets
+ * that flows into the host (reads input0 file), and the standard deviation of the lengths of packets
+ * that flows out from the host (reads input1 file).
+ */
+public final class NetworkTraceAnalysis {
+  /**
+   * Private constructor.
+   */
+  private NetworkTraceAnalysis() {
+  }
+
+  /**
+   * Main function for the Beam program.
+   * @param args arguments.
+   */
+  public static void main(final String[] args) {
+    final String input0FilePath = args[0];
+    final String input1FilePath = args[1];
+    final String outputFilePath = args[2];
+    final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
+    options.setRunner(NemoPipelineRunner.class);
+    options.setJobName("NetworkTraceAnalysis");
+
+    // Given "4 0.0 192.168.3.1 -> 192.168.0.2 Len=29", this finds "192.168.3.1", "192.168.0.2" and "29"
+    final Pattern pattern = Pattern.compile(" *\\d+ +[0-9.]+ +([0-9.]+) -> ([0-9.]+) +.*Len=(\\d+)");
+
+    final SimpleFunction<String, Boolean> filter = new SimpleFunction<String, Boolean>() {
+      @Override
+      public Boolean apply(final String line) {
+        return pattern.matcher(line).find();
+      }
+    };
+    final SimpleFunction<KV<String, Iterable<KV<String, Long>>>, KV<String, Long>> mapToStdev
+        = new SimpleFunction<KV<String, Iterable<KV<String, Long>>>, KV<String, Long>>() {
+      @Override
+      public KV<String, Long> apply(final KV<String, Iterable<KV<String, Long>>> kv) {
+        return KV.of(kv.getKey(), stdev(kv.getValue()));
+      }
+    };
+
+    final Pipeline p = Pipeline.create(options);
+    final PCollection<KV<String, Long>> in0 = GenericSourceSink.read(p, input0FilePath)
+        .apply(Filter.by(filter))
+        .apply(MapElements.via(new SimpleFunction<String, KV<String, KV<String, Long>>>() {
+          @Override
+          public KV<String, KV<String, Long>> apply(final String line) {
+            final Matcher matcher = pattern.matcher(line);
+            matcher.find();
+            return KV.of(matcher.group(2), KV.of(matcher.group(1), Long.valueOf(matcher.group(3))));
+          }
+        }))
+        .apply(GroupByKey.create())
+        .apply(MapElements.via(mapToStdev));
+    final PCollection<KV<String, Long>> in1 = GenericSourceSink.read(p, input1FilePath)
+        .apply(Filter.by(filter))
+        .apply(MapElements.via(new SimpleFunction<String, KV<String, KV<String, Long>>>() {
+          @Override
+          public KV<String, KV<String, Long>> apply(final String line) {
+            final Matcher matcher = pattern.matcher(line);
+            matcher.find();
+            return KV.of(matcher.group(1), KV.of(matcher.group(2), Long.valueOf(matcher.group(3))));
+          }
+        }))
+        .apply(GroupByKey.create())
+        .apply(MapElements.via(mapToStdev));
+    final TupleTag<Long> tag0 = new TupleTag<>();
+    final TupleTag<Long> tag1 = new TupleTag<>();
+    final PCollection<KV<String, CoGbkResult>> joined =
+        KeyedPCollectionTuple.of(tag0, in0).and(tag1, in1).apply(CoGroupByKey.create());
+    final PCollection<String> result = joined
+        .apply(MapElements.via(new SimpleFunction<KV<String, CoGbkResult>, String>() {
+          @Override
+          public String apply(final KV<String, CoGbkResult> kv) {
+            final long source = getLong(kv.getValue().getAll(tag0));
+            final long destination = getLong(kv.getValue().getAll(tag1));
+            final String intermediate = kv.getKey();
+            return new StringBuilder(intermediate).append(",").append(source).append(",")
+                .append(destination).toString();
+          }
+        }));
+    GenericSourceSink.write(result, outputFilePath);
+    p.run();
+  }
+
+  private static long getLong(final Iterable<Long> data) {
+    for (final long datum : data) {
+      return datum;
+    }
+    return 0;
+  }
+
+  private static long stdev(final Iterable<KV<String, Long>> data) {
+    final StandardDeviation stdev = new StandardDeviation();
+    final List<Long> elements = new ArrayList<>();
+    for (final KV<String, Long> e : data) {
+      elements.add(e.getValue());
+    }
+    return Math.round(stdev.evaluate(elements.stream().mapToDouble(e -> e).toArray()));
+  }
+}
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/NetworkTraceAnalysisITCase.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/NetworkTraceAnalysisITCase.java
new file mode 100644
index 0000000..f5e344c
--- /dev/null
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/NetworkTraceAnalysisITCase.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.examples.beam;
+
+import edu.snu.nemo.client.JobLauncher;
+import edu.snu.nemo.common.test.ArgBuilder;
+import edu.snu.nemo.common.test.ExampleTestUtil;
+import edu.snu.nemo.examples.beam.policy.DefaultPolicyParallelismFive;
+import edu.snu.nemo.examples.beam.policy.PadoPolicyParallelismFive;
+import edu.snu.nemo.examples.beam.policy.SailfishPolicyParallelismFive;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JobLauncher.class)
+public final class NetworkTraceAnalysisITCase {
+  private static final int TIMEOUT = 120000;
+  private static ArgBuilder builder;
+  private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
+
+  private static final String inputFileName0 = "sample_input_network0";
+  private static final String inputFileName1 = "sample_input_network1";
+  private static final String outputFileName = "sample_output_network";
+  private static final String testResourceFileName = "test_output_network";
+  private static final String executorResourceFileName = fileBasePath + "beam_sample_executor_resources.json";
+  private static final String inputFilePath0 =  fileBasePath + inputFileName0;
+  private static final String inputFilePath1 =  fileBasePath + inputFileName1;
+  private static final String outputFilePath =  fileBasePath + outputFileName;
+
+  @Before
+  public void setUp() throws Exception {
+    builder = new ArgBuilder()
+        .addResourceJson(executorResourceFileName)
+        .addUserMain(NetworkTraceAnalysis.class.getCanonicalName())
+        .addUserArgs(inputFilePath0, inputFilePath1, outputFilePath);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    try {
+      ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, testResourceFileName);
+    } finally {
+      ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+    }
+  }
+
+  @Test(timeout = TIMEOUT)
+  public void test() throws Exception {
+    JobLauncher.main(builder
+        .addJobId(NetworkTraceAnalysisITCase.class.getSimpleName())
+        .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
+        .build());
+  }
+
+  @Test (timeout = TIMEOUT)
+  public void testSailfish() throws Exception {
+    JobLauncher.main(builder
+        .addJobId(NetworkTraceAnalysisITCase.class.getSimpleName() + "_sailfish")
+        .addOptimizationPolicy(SailfishPolicyParallelismFive.class.getCanonicalName())
+        .build());
+  }
+
+  @Test (timeout = TIMEOUT)
+  public void testPado() throws Exception {
+    JobLauncher.main(builder
+        .addJobId(NetworkTraceAnalysisITCase.class.getSimpleName() + "_pado")
+        .addOptimizationPolicy(PadoPolicyParallelismFive.class.getCanonicalName())
+        .build());
+  }
+}
diff --git a/examples/resources/sample_input_network0 b/examples/resources/sample_input_network0
new file mode 100644
index 0000000..55176d7
--- /dev/null
+++ b/examples/resources/sample_input_network0
@@ -0,0 +1,8 @@
+1 0.0 192.168.0.1 -> 192.168.0.2 Len=32
+2 0.0 192.168.1.1 -> 192.168.0.2 Len=31
+3 0.0 192.168.2.1 -> 192.168.0.2 Len=30
+4 0.0 192.168.3.1 -> 192.168.0.2 Len=29
+5 0.0 192.168.4.1 -> 192.168.0.2 Len=28
+6 0.0 192.168.5.1 -> 192.168.0.3 Len=27
+7 0.0 192.168.5.1 -> 192.168.0.3 Len=26
+8 0.0 192.168.1.1 -> 192.168.0.3 Len=25
diff --git a/examples/resources/sample_input_network1 b/examples/resources/sample_input_network1
new file mode 100644
index 0000000..f126482
--- /dev/null
+++ b/examples/resources/sample_input_network1
@@ -0,0 +1,8 @@
+1 0.0 192.168.0.2 -> 192.168.1.10 Len=32
+2 0.0 192.168.0.2 -> 192.168.1.10 Len=31
+3 0.0 192.168.0.2 -> 192.168.2.10 Len=30
+4 0.0 192.168.0.2 -> 192.168.2.10 Len=29
+5 0.0 192.168.0.2 -> 192.168.3.10 Len=16
+6 0.0 192.168.0.3 -> 192.168.3.10 Len=15
+7 0.0 192.168.0.3 -> 192.168.4.10 Len=14
+8 0.0 192.168.0.3 -> 192.168.4.10 Len=13
diff --git a/examples/resources/test_output_network b/examples/resources/test_output_network
new file mode 100644
index 0000000..a6f4115
--- /dev/null
+++ b/examples/resources/test_output_network
@@ -0,0 +1,2 @@
+192.168.0.2,2,7
+192.168.0.3,1,1
diff --git a/pom.xml b/pom.xml
index 42ad7ab..aa8837d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -44,6 +44,7 @@ limitations under the License.
         <netty.version>4.1.16.Final</netty.version>
         <jetty-server.version>9.4.10.v20180503</jetty-server.version>
         <jetty-servlet.version>9.4.10.v20180503</jetty-servlet.version>
+        <commons-math.version>3.6.1</commons-math.version>
         <slf4j.version>1.7.20</slf4j.version>
         <!-- Tests -->
         <mockito.version>2.13.0</mockito.version>
diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
index d6c6ccc..f4493e8 100644
--- a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
+++ b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
@@ -16,6 +16,7 @@
 package edu.snu.nemo.driver;
 
 import edu.snu.nemo.common.ir.IdManager;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.NodeNamesAssignmentPass;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
@@ -81,7 +82,8 @@ public final class NemoDriver {
                      final LocalAddressProvider localAddressProvider,
                      final JobMessageObserver client,
                      final ClientRPC clientRPC,
-                     @Parameter(JobConf.ExecutorJsonContents.class) final String resourceSpecificationString,
+                     @Parameter(JobConf.ExecutorJSONContents.class) final String resourceSpecificationString,
+                     @Parameter(JobConf.BandwidthJSONContents.class) final String bandwidthString,
                      @Parameter(JobConf.JobId.class) final String jobId,
                      @Parameter(JobConf.FileDirectory.class) final String localDirectory,
                      @Parameter(JobConf.GlusterVolumeDirectory.class) final String glusterDirectory) {
@@ -96,6 +98,8 @@ public final class NemoDriver {
     this.glusterDirectory = glusterDirectory;
     this.handler = new RemoteClientMessageLoggingHandler(client);
     this.clientRPC = clientRPC;
+    // TODO #69: Support job-wide execution property
+    NodeNamesAssignmentPass.setBandwidthSpecificationString(bandwidthString);
     clientRPC.registerHandler(ControlMessage.ClientToDriverMessageType.LaunchDAG,
         message -> startSchedulingUserApplication(message.getLaunchDAG().getDag()));
     // Send DriverStarted message to the client
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
new file mode 100644
index 0000000..584cd67
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.NodeNamesProperty;
+import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.plan.Task;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+
+import javax.inject.Inject;
+import java.util.*;
+
+/**
+ * This constraint is to follow {@link NodeNamesProperty}.
+ */
+@AssociatedProperty(NodeNamesProperty.class)
+public final class NodeShareSchedulingConstraint implements SchedulingConstraint {
+
+  @Inject
+  private NodeShareSchedulingConstraint() {
+  }
+
+  private String getNodeName(final Map<String, Integer> propertyValue, final int taskIndex) {
+    final List<String> nodeNames = new ArrayList<>(propertyValue.keySet());
+    Collections.sort(nodeNames, Comparator.naturalOrder());
+    int index = taskIndex;
+    for (final String nodeName : nodeNames) {
+      if (index < propertyValue.get(nodeName)) {
+        index -= propertyValue.get(nodeName);
+      } else {
+        return nodeName;
+      }
+    }
+    throw new IllegalStateException("Detected excessive parallelism which NodeNamesProperty does not cover");
+  }
+
+  @Override
+  public boolean testSchedulability(final ExecutorRepresenter executor, final Task task) {
+    final Map<String, Integer> propertyValue = task.getPropertyValue(NodeNamesProperty.class)
+            .orElseThrow(() -> new RuntimeException("NodeNamesProperty expected"));
+    if (propertyValue.isEmpty()) {
+      return true;
+    }
+    return executor.getNodeName().equals(
+        getNodeName(propertyValue, RuntimeIdGenerator.getIndexFromTaskId(task.getTaskId())));
+  }
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
index 97ab554..639e775 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
@@ -39,10 +39,12 @@ public final class SchedulingConstraintRegistry {
   private SchedulingConstraintRegistry(
       final ContainerTypeAwareSchedulingConstraint containerTypeAwareSchedulingConstraint,
       final FreeSlotSchedulingConstraint freeSlotSchedulingConstraint,
-      final SourceLocationAwareSchedulingConstraint sourceLocationAwareSchedulingConstraint) {
+      final SourceLocationAwareSchedulingConstraint sourceLocationAwareSchedulingConstraint,
+      final NodeShareSchedulingConstraint nodeShareSchedulingConstraint) {
     registerSchedulingConstraint(containerTypeAwareSchedulingConstraint);
     registerSchedulingConstraint(freeSlotSchedulingConstraint);
     registerSchedulingConstraint(sourceLocationAwareSchedulingConstraint);
+    registerSchedulingConstraint(nodeShareSchedulingConstraint);
   }
 
   /**
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
index 41edef3..b6ac067 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
@@ -28,21 +28,21 @@ public final class PolicyBuilderTest {
   @Test
   public void testDisaggregationPolicy() {
     final Policy disaggregationPolicy = new DisaggregationPolicy();
-    assertEquals(16, disaggregationPolicy.getCompileTimePasses().size());
+    assertEquals(17, disaggregationPolicy.getCompileTimePasses().size());
     assertEquals(0, disaggregationPolicy.getRuntimePasses().size());
   }
 
   @Test
   public void testPadoPolicy() {
     final Policy padoPolicy = new PadoPolicy();
-    assertEquals(18, padoPolicy.getCompileTimePasses().size());
+    assertEquals(19, padoPolicy.getCompileTimePasses().size());
     assertEquals(0, padoPolicy.getRuntimePasses().size());
   }
 
   @Test
   public void testDataSkewPolicy() {
     final Policy dataSkewPolicy = new DataSkewPolicy();
-    assertEquals(20, dataSkewPolicy.getCompileTimePasses().size());
+    assertEquals(21, dataSkewPolicy.getCompileTimePasses().size());
     assertEquals(1, dataSkewPolicy.getRuntimePasses().size());
   }