You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/07/10 07:13:08 UTC
[incubator-nemo] branch master updated: [NEMO-71] Add
NodeNamesAssignmentPass and Example Application (#62)
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 64a0912 [NEMO-71] Add NodeNamesAssignmentPass and Example Application (#62)
64a0912 is described below
commit 64a09120e28e0817842d1861bed6e9289b8cd348
Author: Jangho Seo <ja...@jangho.io>
AuthorDate: Tue Jul 10 16:13:04 2018 +0900
[NEMO-71] Add NodeNamesAssignmentPass and Example Application (#62)
JIRA: [NEMO-71: Add LocationShareAssignmentPass and Example Application](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-71)
**Major changes:**
- Added NodeNamesAssignmentPass, which computes and assigns appropriate share of nodes to each vertex.
- Add join example (NetworkTraceAnalysis).
**Minor changes to note:**
- N/A
**Tests for the changes:**
- Added integration test (NetworkTraceAnalysisITCase)
**Other comments:**
- N/A
resolves [NEMO-71](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-71)
---
.../main/java/edu/snu/nemo/client/JobLauncher.java | 31 ++-
.../executionproperty/NodeNamesProperty.java | 42 ++++
compiler/optimizer/pom.xml | 15 ++
.../annotating/NodeNamesAssignmentPass.java | 255 +++++++++++++++++++++
.../composite/PrimitiveCompositePass.java | 1 +
conf/src/main/java/edu/snu/nemo/conf/JobConf.java | 19 +-
.../nemo/examples/beam/NetworkTraceAnalysis.java | 138 +++++++++++
.../examples/beam/NetworkTraceAnalysisITCase.java | 87 +++++++
examples/resources/sample_input_network0 | 8 +
examples/resources/sample_input_network1 | 8 +
examples/resources/test_output_network | 2 +
pom.xml | 1 +
.../main/java/edu/snu/nemo/driver/NemoDriver.java | 6 +-
.../scheduler/NodeShareSchedulingConstraint.java | 61 +++++
.../scheduler/SchedulingConstraintRegistry.java | 4 +-
.../optimizer/policy/PolicyBuilderTest.java | 6 +-
16 files changed, 667 insertions(+), 17 deletions(-)
diff --git a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
index b7cc03e..6826174 100644
--- a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
+++ b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
@@ -33,6 +33,7 @@ import org.apache.reef.io.network.util.StringIdentifierFactory;
import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
import org.apache.reef.tang.*;
+import org.apache.reef.tang.annotations.Name;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.tang.formats.CommandLine;
import org.apache.reef.util.EnvironmentUtils;
@@ -95,12 +96,15 @@ public final class JobLauncher {
final Configuration driverConf = getDriverConf(builtJobConf);
final Configuration driverNcsConf = getDriverNcsConf();
final Configuration driverMessageConfg = getDriverMessageConf();
- final Configuration executorResourceConfig = getExecutorResourceConf(builtJobConf);
+ final Configuration executorResourceConfig = getJSONConf(builtJobConf, JobConf.ExecutorJSONPath.class,
+ JobConf.ExecutorJSONContents.class);
+ final Configuration bandwidthConfig = getJSONConf(builtJobConf, JobConf.BandwidthJSONPath.class,
+ JobConf.BandwidthJSONContents.class);
final Configuration clientConf = getClientConf();
// Merge Job and Driver Confs
jobAndDriverConf = Configurations.merge(builtJobConf, driverConf, driverNcsConf, driverMessageConfg,
- executorResourceConfig, driverRPCServer.getListeningConfiguration());
+ executorResourceConfig, bandwidthConfig, driverRPCServer.getListeningConfiguration());
// Get DeployMode Conf
deployModeConf = Configurations.merge(getDeployModeConf(builtJobConf), clientConf);
@@ -237,7 +241,8 @@ public final class JobLauncher {
cl.registerShortNameOfClass(JobConf.OptimizationPolicy.class);
cl.registerShortNameOfClass(JobConf.DeployMode.class);
cl.registerShortNameOfClass(JobConf.DriverMemMb.class);
- cl.registerShortNameOfClass(JobConf.ExecutorJsonPath.class);
+ cl.registerShortNameOfClass(JobConf.ExecutorJSONPath.class);
+ cl.registerShortNameOfClass(JobConf.BandwidthJSONPath.class);
cl.registerShortNameOfClass(JobConf.JVMHeapSlack.class);
cl.registerShortNameOfClass(JobConf.IORequestHandleThreadsTotal.class);
cl.registerShortNameOfClass(JobConf.MaxScheduleAttempt.class);
@@ -277,19 +282,25 @@ public final class JobLauncher {
}
/**
- * Get executor resource configuration.
+ * Read json file and return its contents as configuration parameter.
*
- * @param jobConf job configuration to get executor json path.
- * @return executor resource configuration.
+ * @param jobConf job configuration to get json path.
+ * @param pathParameter named parameter represents path to the json file, or an empty string
+ * @param contentsParameter named parameter represents contents of the file
+ * @return configuration with contents of the file, or an empty string as value for {@code contentsParameter}
* @throws InjectionException exception while injection.
*/
- private static Configuration getExecutorResourceConf(final Configuration jobConf) throws InjectionException {
+ private static Configuration getJSONConf(final Configuration jobConf,
+ final Class<? extends Name<String>> pathParameter,
+ final Class<? extends Name<String>> contentsParameter)
+ throws InjectionException {
final Injector injector = TANG.newInjector(jobConf);
try {
- final String path = injector.getNamedInstance(JobConf.ExecutorJsonPath.class);
- final String contents = new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8);
+ final String path = injector.getNamedInstance(pathParameter);
+ final String contents = path.isEmpty() ? ""
+ : new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8);
return TANG.newConfigurationBuilder()
- .bindNamedParameter(JobConf.ExecutorJsonContents.class, contents)
+ .bindNamedParameter(contentsParameter, contents)
.build();
} catch (final IOException e) {
throw new RuntimeException(e);
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/NodeNamesProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/NodeNamesProperty.java
new file mode 100644
index 0000000..82cddd9
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/NodeNamesProperty.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.vertex.executionproperty;
+
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+
+import java.util.HashMap;
+
+/**
+ * Map between node name and the number of parallelism which will run on the node.
+ */
+public final class NodeNamesProperty extends VertexExecutionProperty<HashMap<String, Integer>> {
+ /**
+ * Default constructor.
+ * @param value the map from location to the number of Task that must be executed on the node
+ */
+ public NodeNamesProperty(final HashMap<String, Integer> value) {
+ super(value);
+ }
+
+ /**
+ * Static method for constructing {@link NodeNamesProperty}.
+ * @param value the map from location to the number of Task that must be executed on the node
+ * @return the execution property
+ */
+ public static NodeNamesProperty of(final HashMap<String, Integer> value) {
+ return new NodeNamesProperty(value);
+ }
+}
diff --git a/compiler/optimizer/pom.xml b/compiler/optimizer/pom.xml
index 1c65254..8195526 100644
--- a/compiler/optimizer/pom.xml
+++ b/compiler/optimizer/pom.xml
@@ -37,6 +37,11 @@ limitations under the License.
<dependencies>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-math3</artifactId>
+ <version>${commons-math.version}</version>
+ </dependency>
+ <dependency>
<groupId>edu.snu.nemo</groupId>
<artifactId>nemo-common</artifactId>
<version>${project.version}</version>
@@ -46,5 +51,15 @@ limitations under the License.
<artifactId>nemo-runtime-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/NodeNamesAssignmentPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/NodeNamesAssignmentPass.java
new file mode 100644
index 0000000..8c6d8a1
--- /dev/null
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/NodeNamesAssignmentPass.java
@@ -0,0 +1,255 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import com.fasterxml.jackson.core.TreeNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.NodeNamesProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.apache.commons.math3.optim.BaseOptimizer;
+import org.apache.commons.math3.optim.PointValuePair;
+import org.apache.commons.math3.optim.linear.*;
+import org.apache.commons.math3.optim.nonlinear.scalar.GoalType;
+import org.apache.commons.math3.util.Incrementor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.*;
+
+/**
+ * Computes and assigns appropriate share of nodes to each irVertex to minimize shuffle time,
+ * with respect to bandwidth restrictions of nodes. If bandwidth information is not given, this pass does nothing.
+ * This pass follows task assignment of Iridium-style optimization.
+ * http://pages.cs.wisc.edu/~akella/papers/gda-sigcomm15.pdf
+ *
+ * <h3>Assumptions</h3>
+ * This pass assumes no skew in input or intermediate data, so that the number of Task assigned to a node
+ * is proportional to the data size handled by the node.
+ * Also, this pass assumes stages with empty map as {@link NodeNamesProperty} are assigned to nodes evenly.
+ * For example, if source splits are not distributed evenly, any source location-aware scheduling policy will
+ * assign TaskGroups unevenly.
+ * Also, this pass assumes network bandwidth to be the bottleneck. Each node should have enough capacity to run
+ * TaskGroups immediately as scheduler attempts to schedule a TaskGroup.
+ */
+public final class NodeNamesAssignmentPass extends AnnotatingPass {
+
+ // Index of the objective parameter, in the coefficient vector
+ private static final int OBJECTIVE_COEFFICIENT_INDEX = 0;
+ private static final Logger LOG = LoggerFactory.getLogger(NodeNamesAssignmentPass.class);
+ private static final HashMap<String, Integer> EMPTY_MAP = new HashMap<>();
+
+ private static String bandwidthSpecificationString = "";
+
+
+ /**
+ * Default constructor.
+ */
+ public NodeNamesAssignmentPass() {
+ super(NodeNamesProperty.class, Collections.singleton(ParallelismProperty.class));
+ }
+
+ @Override
+ public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+ if (bandwidthSpecificationString.isEmpty()) {
+ dag.topologicalDo(irVertex -> irVertex.setProperty(NodeNamesProperty.of(EMPTY_MAP)));
+ } else {
+ assignNodeShares(dag, BandwidthSpecification.fromJsonString(bandwidthSpecificationString));
+ }
+ return dag;
+ }
+
+ public static void setBandwidthSpecificationString(final String value) {
+ bandwidthSpecificationString = value;
+ }
+
+ private static void assignNodeShares(
+ final DAG<IRVertex, IREdge> dag,
+ final BandwidthSpecification bandwidthSpecification) {
+ dag.topologicalDo(irVertex -> {
+ final Collection<IREdge> inEdges = dag.getIncomingEdgesOf(irVertex);
+ final int parallelism = irVertex.getPropertyValue(ParallelismProperty.class)
+ .orElseThrow(() -> new RuntimeException("Parallelism property required"));
+ if (inEdges.size() == 0) {
+ // The stage is root stage.
+ // Fall back to setting even distribution
+ final HashMap<String, Integer> shares = new HashMap<>();
+ final List<String> nodes = bandwidthSpecification.getNodes();
+ final int defaultShare = parallelism / nodes.size();
+ final int remainder = parallelism % nodes.size();
+ for (int i = 0; i < nodes.size(); i++) {
+ shares.put(nodes.get(i), defaultShare + (i < remainder ? 1 : 0));
+ }
+ irVertex.getExecutionProperties().put(NodeNamesProperty.of(shares));
+ } else if (isOneToOneEdge(inEdges)) {
+ final Optional<NodeNamesProperty> property = dag.getIncomingEdgesOf(irVertex).iterator().next()
+ .getExecutionProperties().get(NodeNamesProperty.class);
+ irVertex.getExecutionProperties().put(property.get());
+ } else {
+ // This IRVertex has shuffle inEdge(s), or has multiple inEdges.
+ final Map<String, Integer> parentLocationShares = new HashMap<>();
+ for (final IREdge edgeToIRVertex : dag.getIncomingEdgesOf(irVertex)) {
+ final IRVertex parentVertex = edgeToIRVertex.getSrc();
+ final Map<String, Integer> shares = parentVertex.getPropertyValue(NodeNamesProperty.class).get();
+ for (final Map.Entry<String, Integer> element : shares.entrySet()) {
+ parentLocationShares.putIfAbsent(element.getKey(), 0);
+ parentLocationShares.put(element.getKey(),
+ element.getValue() + parentLocationShares.get(element.getKey()));
+ }
+ }
+ final double[] ratios = optimize(bandwidthSpecification, parentLocationShares);
+ final HashMap<String, Integer> shares = new HashMap<>();
+ for (int i = 0; i < bandwidthSpecification.getNodes().size(); i++) {
+ shares.put(bandwidthSpecification.getNodes().get(i), (int) (ratios[i] * parallelism));
+ }
+ int remainder = parallelism - shares.values().stream().mapToInt(i -> i).sum();
+ for (final String nodeName : shares.keySet()) {
+ if (remainder == 0) {
+ break;
+ }
+ shares.put(nodeName, shares.get(nodeName) + 1);
+ remainder--;
+ }
+ irVertex.getExecutionProperties().put(NodeNamesProperty.of(shares));
+ }
+ });
+ }
+
+ /**
+ * @param inEdges list of inEdges to the specific irVertex
+ * @return true if and only if the irVertex has one OneToOne edge
+ */
+ private static boolean isOneToOneEdge(final Collection<IREdge> inEdges) {
+ return inEdges.size() == 1 && inEdges.iterator().next()
+ .getPropertyValue(DataCommunicationPatternProperty.class).get()
+ .equals(DataCommunicationPatternProperty.Value.OneToOne);
+ }
+
+ /**
+ * Computes share of parallelism that each node is responsible for.
+ * @param bandwidthSpecification provides bandwidth information between nodes
+ * @param parentNodeShares shares of parallelism for the parent vertex
+ * @return array of fractions of parallelism that each node is responsible for
+ */
+ private static double[] optimize(final BandwidthSpecification bandwidthSpecification,
+ final Map<String, Integer> parentNodeShares) {
+ final int parentParallelism = parentNodeShares.values().stream().mapToInt(i -> i).sum();
+ final List<String> nodeNames = bandwidthSpecification.getNodes();
+ final List<LinearConstraint> constraints = new ArrayList<>();
+ final int coefficientVectorSize = nodeNames.size() + 1;
+
+ for (int i = 0; i < nodeNames.size(); i++) {
+ final String nodeName = nodeNames.get(i);
+ final int nodeCoefficientIndex = i + 1;
+ final int parentParallelismOnThisLocation = parentNodeShares.get(nodeName);
+
+ // Upload bandwidth
+ final double[] uploadCoefficientVector = new double[coefficientVectorSize];
+ uploadCoefficientVector[OBJECTIVE_COEFFICIENT_INDEX] = bandwidthSpecification.up(nodeName);
+ uploadCoefficientVector[nodeCoefficientIndex] = parentParallelismOnThisLocation;
+ constraints.add(new LinearConstraint(uploadCoefficientVector, Relationship.GEQ,
+ parentParallelismOnThisLocation));
+
+ // Download bandwidth
+ final double[] downloadCoefficientVector = new double[coefficientVectorSize];
+ downloadCoefficientVector[OBJECTIVE_COEFFICIENT_INDEX] = bandwidthSpecification.down(nodeName);
+ downloadCoefficientVector[nodeCoefficientIndex] = parentParallelismOnThisLocation - parentParallelism;
+ constraints.add(new LinearConstraint(downloadCoefficientVector, Relationship.GEQ, 0));
+
+ // The coefficient is non-negative
+ final double[] nonNegativeCoefficientVector = new double[coefficientVectorSize];
+ nonNegativeCoefficientVector[nodeCoefficientIndex] = 1;
+ constraints.add(new LinearConstraint(nonNegativeCoefficientVector, Relationship.GEQ, 0));
+ }
+
+ // The sum of all coefficient is 1
+ final double[] sumCoefficientVector = new double[coefficientVectorSize];
+ for (int i = 0; i < nodeNames.size(); i++) {
+ sumCoefficientVector[OBJECTIVE_COEFFICIENT_INDEX + 1 + i] = 1;
+ }
+ constraints.add(new LinearConstraint(sumCoefficientVector, Relationship.EQ, 1));
+
+ // Objective
+ final double[] objectiveCoefficientVector = new double[coefficientVectorSize];
+ objectiveCoefficientVector[OBJECTIVE_COEFFICIENT_INDEX] = 1;
+ final LinearObjectiveFunction objectiveFunction = new LinearObjectiveFunction(objectiveCoefficientVector, 0);
+
+ // Solve
+ try {
+ final SimplexSolver solver = new SimplexSolver();
+ final Field iterations = BaseOptimizer.class.getDeclaredField("iterations");
+ iterations.setAccessible(true);
+ final Incrementor incrementor = (Incrementor) iterations.get(solver);
+ incrementor.setMaximalCount(2147483647);
+ LOG.info(String.format("Max iterations: %d", solver.getMaxIterations()));
+ final PointValuePair solved = solver.optimize(
+ new LinearConstraintSet(constraints), objectiveFunction, GoalType.MINIMIZE);
+
+ return Arrays.copyOfRange(solved.getPoint(), OBJECTIVE_COEFFICIENT_INDEX + 1, coefficientVectorSize);
+ } catch (final NoSuchFieldException | IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Bandwidth specification.
+ */
+ private static final class BandwidthSpecification {
+ private final List<String> nodeNames = new ArrayList<>();
+ private final Map<String, Integer> uplinkBandwidth = new HashMap<>();
+ private final Map<String, Integer> downlinkBandwidth = new HashMap<>();
+
+ private BandwidthSpecification() {
+ }
+
+ static BandwidthSpecification fromJsonString(final String jsonString) {
+ final BandwidthSpecification specification = new BandwidthSpecification();
+ try {
+ final ObjectMapper objectMapper = new ObjectMapper();
+ final TreeNode jsonRootNode = objectMapper.readTree(jsonString);
+ for (int i = 0; i < jsonRootNode.size(); i++) {
+ final TreeNode locationNode = jsonRootNode.get(i);
+ final String name = locationNode.get("name").traverse().nextTextValue();
+ final int up = locationNode.get("up").traverse().getIntValue();
+ final int down = locationNode.get("down").traverse().getIntValue();
+ specification.nodeNames.add(name);
+ specification.uplinkBandwidth.put(name, up);
+ specification.downlinkBandwidth.put(name, down);
+ }
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ return specification;
+ }
+
+ int up(final String nodeName) {
+ return uplinkBandwidth.get(nodeName);
+ }
+
+ int down(final String nodeName) {
+ return downlinkBandwidth.get(nodeName);
+ }
+
+ List<String> getNodes() {
+ return nodeNames;
+ }
+ }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
index a30abec..dc8abfc 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
@@ -39,6 +39,7 @@ public final class PrimitiveCompositePass extends CompositePass {
new CompressionPass(),
new DecompressionPass(),
new SourceLocationAwareSchedulingPass(),
+ new NodeNamesAssignmentPass(),
new ExecutorSlotCompliancePass()
));
}
diff --git a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
index 1e4ef4d..bed8099 100644
--- a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
+++ b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
@@ -116,11 +116,26 @@ public final class JobConf extends ConfigurationModuleBuilder {
}
/**
+ * Path to the JSON file that specifies bandwidth between locations.
+ */
+ @NamedParameter(doc = "Path to the JSON file that specifies bandwidth between locations",
+ short_name = "bandwidth_json", default_value = "")
+ public final class BandwidthJSONPath implements Name<String> {
+ }
+
+ /**
* Path to the JSON file that specifies resource layout.
*/
@NamedParameter(doc = "Path to the JSON file that specifies resources for executors", short_name = "executor_json",
default_value = "examples/resources/sample_executor_resources.json")
- public final class ExecutorJsonPath implements Name<String> {
+ public final class ExecutorJSONPath implements Name<String> {
+ }
+
+ /**
+ * Contents of the JSON file that specifies bandwidth between locations.
+ */
+ @NamedParameter(doc = "Contents of JSON file that specifies bandwidth between locations")
+ public final class BandwidthJSONContents implements Name<String> {
}
/**
@@ -135,7 +150,7 @@ public final class JobConf extends ConfigurationModuleBuilder {
* Contents of the JSON file that specifies resource layout.
*/
@NamedParameter(doc = "Contents of JSON file that specifies resources for executors")
- public final class ExecutorJsonContents implements Name<String> {
+ public final class ExecutorJSONContents implements Name<String> {
}
/**
diff --git a/examples/beam/src/main/java/edu/snu/nemo/examples/beam/NetworkTraceAnalysis.java b/examples/beam/src/main/java/edu/snu/nemo/examples/beam/NetworkTraceAnalysis.java
new file mode 100644
index 0000000..1f2f990
--- /dev/null
+++ b/examples/beam/src/main/java/edu/snu/nemo/examples/beam/NetworkTraceAnalysis.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.examples.beam;
+
+import edu.snu.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import edu.snu.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * An app that analyzes data flow from network trace.
+ * Each line in the output file represents a host, containing the standard deviation of the lengths of packets
+ * that flows into the host (reads input0 file), and the standard deviation of the lengths of packets
+ * that flows out from the host (reads input1 file).
+ */
+public final class NetworkTraceAnalysis {
+ /**
+ * Private constructor.
+ */
+ private NetworkTraceAnalysis() {
+ }
+
+ /**
+ * Main function for the Beam program.
+ * @param args arguments.
+ */
+ public static void main(final String[] args) {
+ final String input0FilePath = args[0];
+ final String input1FilePath = args[1];
+ final String outputFilePath = args[2];
+ final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
+ options.setRunner(NemoPipelineRunner.class);
+ options.setJobName("NetworkTraceAnalysis");
+
+ // Given "4 0.0 192.168.3.1 -> 192.168.0.2 Len=29", this finds "192.168.3.1", "192.168.0.2" and "29"
+ final Pattern pattern = Pattern.compile(" *\\d+ +[0-9.]+ +([0-9.]+) -> ([0-9.]+) +.*Len=(\\d+)");
+
+ final SimpleFunction<String, Boolean> filter = new SimpleFunction<String, Boolean>() {
+ @Override
+ public Boolean apply(final String line) {
+ return pattern.matcher(line).find();
+ }
+ };
+ final SimpleFunction<KV<String, Iterable<KV<String, Long>>>, KV<String, Long>> mapToStdev
+ = new SimpleFunction<KV<String, Iterable<KV<String, Long>>>, KV<String, Long>>() {
+ @Override
+ public KV<String, Long> apply(final KV<String, Iterable<KV<String, Long>>> kv) {
+ return KV.of(kv.getKey(), stdev(kv.getValue()));
+ }
+ };
+
+ final Pipeline p = Pipeline.create(options);
+ final PCollection<KV<String, Long>> in0 = GenericSourceSink.read(p, input0FilePath)
+ .apply(Filter.by(filter))
+ .apply(MapElements.via(new SimpleFunction<String, KV<String, KV<String, Long>>>() {
+ @Override
+ public KV<String, KV<String, Long>> apply(final String line) {
+ final Matcher matcher = pattern.matcher(line);
+ matcher.find();
+ return KV.of(matcher.group(2), KV.of(matcher.group(1), Long.valueOf(matcher.group(3))));
+ }
+ }))
+ .apply(GroupByKey.create())
+ .apply(MapElements.via(mapToStdev));
+ final PCollection<KV<String, Long>> in1 = GenericSourceSink.read(p, input1FilePath)
+ .apply(Filter.by(filter))
+ .apply(MapElements.via(new SimpleFunction<String, KV<String, KV<String, Long>>>() {
+ @Override
+ public KV<String, KV<String, Long>> apply(final String line) {
+ final Matcher matcher = pattern.matcher(line);
+ matcher.find();
+ return KV.of(matcher.group(1), KV.of(matcher.group(2), Long.valueOf(matcher.group(3))));
+ }
+ }))
+ .apply(GroupByKey.create())
+ .apply(MapElements.via(mapToStdev));
+ final TupleTag<Long> tag0 = new TupleTag<>();
+ final TupleTag<Long> tag1 = new TupleTag<>();
+ final PCollection<KV<String, CoGbkResult>> joined =
+ KeyedPCollectionTuple.of(tag0, in0).and(tag1, in1).apply(CoGroupByKey.create());
+ final PCollection<String> result = joined
+ .apply(MapElements.via(new SimpleFunction<KV<String, CoGbkResult>, String>() {
+ @Override
+ public String apply(final KV<String, CoGbkResult> kv) {
+ final long source = getLong(kv.getValue().getAll(tag0));
+ final long destination = getLong(kv.getValue().getAll(tag1));
+ final String intermediate = kv.getKey();
+ return new StringBuilder(intermediate).append(",").append(source).append(",")
+ .append(destination).toString();
+ }
+ }));
+ GenericSourceSink.write(result, outputFilePath);
+ p.run();
+ }
+
+ private static long getLong(final Iterable<Long> data) {
+ for (final long datum : data) {
+ return datum;
+ }
+ return 0;
+ }
+
+ private static long stdev(final Iterable<KV<String, Long>> data) {
+ final StandardDeviation stdev = new StandardDeviation();
+ final List<Long> elements = new ArrayList<>();
+ for (final KV<String, Long> e : data) {
+ elements.add(e.getValue());
+ }
+ return Math.round(stdev.evaluate(elements.stream().mapToDouble(e -> e).toArray()));
+ }
+}
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/NetworkTraceAnalysisITCase.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/NetworkTraceAnalysisITCase.java
new file mode 100644
index 0000000..f5e344c
--- /dev/null
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/NetworkTraceAnalysisITCase.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.examples.beam;
+
+import edu.snu.nemo.client.JobLauncher;
+import edu.snu.nemo.common.test.ArgBuilder;
+import edu.snu.nemo.common.test.ExampleTestUtil;
+import edu.snu.nemo.examples.beam.policy.DefaultPolicyParallelismFive;
+import edu.snu.nemo.examples.beam.policy.PadoPolicyParallelismFive;
+import edu.snu.nemo.examples.beam.policy.SailfishPolicyParallelismFive;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JobLauncher.class)
+public final class NetworkTraceAnalysisITCase {
+ private static final int TIMEOUT = 120000;
+ private static ArgBuilder builder;
+ private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
+
+ private static final String inputFileName0 = "sample_input_network0";
+ private static final String inputFileName1 = "sample_input_network1";
+ private static final String outputFileName = "sample_output_network";
+ private static final String testResourceFileName = "test_output_network";
+ private static final String executorResourceFileName = fileBasePath + "beam_sample_executor_resources.json";
+ private static final String inputFilePath0 = fileBasePath + inputFileName0;
+ private static final String inputFilePath1 = fileBasePath + inputFileName1;
+ private static final String outputFilePath = fileBasePath + outputFileName;
+
+ @Before
+ public void setUp() throws Exception {
+ builder = new ArgBuilder()
+ .addResourceJson(executorResourceFileName)
+ .addUserMain(NetworkTraceAnalysis.class.getCanonicalName())
+ .addUserArgs(inputFilePath0, inputFilePath1, outputFilePath);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try {
+ ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, testResourceFileName);
+ } finally {
+ ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+ }
+ }
+
+ @Test(timeout = TIMEOUT)
+ public void test() throws Exception {
+ JobLauncher.main(builder
+ .addJobId(NetworkTraceAnalysisITCase.class.getSimpleName())
+ .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
+ .build());
+ }
+
+ @Test (timeout = TIMEOUT)
+ public void testSailfish() throws Exception {
+ JobLauncher.main(builder
+ .addJobId(NetworkTraceAnalysisITCase.class.getSimpleName() + "_sailfish")
+ .addOptimizationPolicy(SailfishPolicyParallelismFive.class.getCanonicalName())
+ .build());
+ }
+
+ @Test (timeout = TIMEOUT)
+ public void testPado() throws Exception {
+ JobLauncher.main(builder
+ .addJobId(NetworkTraceAnalysisITCase.class.getSimpleName() + "_pado")
+ .addOptimizationPolicy(PadoPolicyParallelismFive.class.getCanonicalName())
+ .build());
+ }
+}
diff --git a/examples/resources/sample_input_network0 b/examples/resources/sample_input_network0
new file mode 100644
index 0000000..55176d7
--- /dev/null
+++ b/examples/resources/sample_input_network0
@@ -0,0 +1,8 @@
+1 0.0 192.168.0.1 -> 192.168.0.2 Len=32
+2 0.0 192.168.1.1 -> 192.168.0.2 Len=31
+3 0.0 192.168.2.1 -> 192.168.0.2 Len=30
+4 0.0 192.168.3.1 -> 192.168.0.2 Len=29
+5 0.0 192.168.4.1 -> 192.168.0.2 Len=28
+6 0.0 192.168.5.1 -> 192.168.0.3 Len=27
+7 0.0 192.168.5.1 -> 192.168.0.3 Len=26
+8 0.0 192.168.1.1 -> 192.168.0.3 Len=25
diff --git a/examples/resources/sample_input_network1 b/examples/resources/sample_input_network1
new file mode 100644
index 0000000..f126482
--- /dev/null
+++ b/examples/resources/sample_input_network1
@@ -0,0 +1,8 @@
+1 0.0 192.168.0.2 -> 192.168.1.10 Len=32
+2 0.0 192.168.0.2 -> 192.168.1.10 Len=31
+3 0.0 192.168.0.2 -> 192.168.2.10 Len=30
+4 0.0 192.168.0.2 -> 192.168.2.10 Len=29
+5 0.0 192.168.0.2 -> 192.168.3.10 Len=16
+6 0.0 192.168.0.3 -> 192.168.3.10 Len=15
+7 0.0 192.168.0.3 -> 192.168.4.10 Len=14
+8 0.0 192.168.0.3 -> 192.168.4.10 Len=13
diff --git a/examples/resources/test_output_network b/examples/resources/test_output_network
new file mode 100644
index 0000000..a6f4115
--- /dev/null
+++ b/examples/resources/test_output_network
@@ -0,0 +1,2 @@
+192.168.0.2,2,7
+192.168.0.3,1,1
diff --git a/pom.xml b/pom.xml
index 42ad7ab..aa8837d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -44,6 +44,7 @@ limitations under the License.
<netty.version>4.1.16.Final</netty.version>
<jetty-server.version>9.4.10.v20180503</jetty-server.version>
<jetty-servlet.version>9.4.10.v20180503</jetty-servlet.version>
+ <commons-math.version>3.6.1</commons-math.version>
<slf4j.version>1.7.20</slf4j.version>
<!-- Tests -->
<mockito.version>2.13.0</mockito.version>
diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
index d6c6ccc..f4493e8 100644
--- a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
+++ b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
@@ -16,6 +16,7 @@
package edu.snu.nemo.driver;
import edu.snu.nemo.common.ir.IdManager;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.NodeNamesAssignmentPass;
import edu.snu.nemo.conf.JobConf;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import edu.snu.nemo.runtime.common.comm.ControlMessage;
@@ -81,7 +82,8 @@ public final class NemoDriver {
final LocalAddressProvider localAddressProvider,
final JobMessageObserver client,
final ClientRPC clientRPC,
- @Parameter(JobConf.ExecutorJsonContents.class) final String resourceSpecificationString,
+ @Parameter(JobConf.ExecutorJSONContents.class) final String resourceSpecificationString,
+ @Parameter(JobConf.BandwidthJSONContents.class) final String bandwidthString,
@Parameter(JobConf.JobId.class) final String jobId,
@Parameter(JobConf.FileDirectory.class) final String localDirectory,
@Parameter(JobConf.GlusterVolumeDirectory.class) final String glusterDirectory) {
@@ -96,6 +98,8 @@ public final class NemoDriver {
this.glusterDirectory = glusterDirectory;
this.handler = new RemoteClientMessageLoggingHandler(client);
this.clientRPC = clientRPC;
+ // TODO #69: Support job-wide execution property
+ NodeNamesAssignmentPass.setBandwidthSpecificationString(bandwidthString);
clientRPC.registerHandler(ControlMessage.ClientToDriverMessageType.LaunchDAG,
message -> startSchedulingUserApplication(message.getLaunchDAG().getDag()));
// Send DriverStarted message to the client
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
new file mode 100644
index 0000000..584cd67
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.NodeNamesProperty;
+import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.plan.Task;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+
+import javax.inject.Inject;
+import java.util.*;
+
+/**
+ * This constraint is to follow {@link NodeNamesProperty}.
+ */
+@AssociatedProperty(NodeNamesProperty.class)
+public final class NodeShareSchedulingConstraint implements SchedulingConstraint {
+
+ @Inject
+ private NodeShareSchedulingConstraint() {
+ }
+
+ private String getNodeName(final Map<String, Integer> propertyValue, final int taskIndex) {
+ final List<String> nodeNames = new ArrayList<>(propertyValue.keySet());
+ Collections.sort(nodeNames, Comparator.naturalOrder());
+ int index = taskIndex;
+ for (final String nodeName : nodeNames) {
+ if (index < propertyValue.get(nodeName)) {
+ index -= propertyValue.get(nodeName);
+ } else {
+ return nodeName;
+ }
+ }
+ throw new IllegalStateException("Detected excessive parallelism which NodeNamesProperty does not cover");
+ }
+
+ @Override
+ public boolean testSchedulability(final ExecutorRepresenter executor, final Task task) {
+ final Map<String, Integer> propertyValue = task.getPropertyValue(NodeNamesProperty.class)
+ .orElseThrow(() -> new RuntimeException("NodeNamesProperty expected"));
+ if (propertyValue.isEmpty()) {
+ return true;
+ }
+ return executor.getNodeName().equals(
+ getNodeName(propertyValue, RuntimeIdGenerator.getIndexFromTaskId(task.getTaskId())));
+ }
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
index 97ab554..639e775 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
@@ -39,10 +39,12 @@ public final class SchedulingConstraintRegistry {
private SchedulingConstraintRegistry(
final ContainerTypeAwareSchedulingConstraint containerTypeAwareSchedulingConstraint,
final FreeSlotSchedulingConstraint freeSlotSchedulingConstraint,
- final SourceLocationAwareSchedulingConstraint sourceLocationAwareSchedulingConstraint) {
+ final SourceLocationAwareSchedulingConstraint sourceLocationAwareSchedulingConstraint,
+ final NodeShareSchedulingConstraint nodeShareSchedulingConstraint) {
registerSchedulingConstraint(containerTypeAwareSchedulingConstraint);
registerSchedulingConstraint(freeSlotSchedulingConstraint);
registerSchedulingConstraint(sourceLocationAwareSchedulingConstraint);
+ registerSchedulingConstraint(nodeShareSchedulingConstraint);
}
/**
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
index 41edef3..b6ac067 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
@@ -28,21 +28,21 @@ public final class PolicyBuilderTest {
@Test
public void testDisaggregationPolicy() {
final Policy disaggregationPolicy = new DisaggregationPolicy();
- assertEquals(16, disaggregationPolicy.getCompileTimePasses().size());
+ assertEquals(17, disaggregationPolicy.getCompileTimePasses().size());
assertEquals(0, disaggregationPolicy.getRuntimePasses().size());
}
@Test
public void testPadoPolicy() {
final Policy padoPolicy = new PadoPolicy();
- assertEquals(18, padoPolicy.getCompileTimePasses().size());
+ assertEquals(19, padoPolicy.getCompileTimePasses().size());
assertEquals(0, padoPolicy.getRuntimePasses().size());
}
@Test
public void testDataSkewPolicy() {
final Policy dataSkewPolicy = new DataSkewPolicy();
- assertEquals(20, dataSkewPolicy.getCompileTimePasses().size());
+ assertEquals(21, dataSkewPolicy.getCompileTimePasses().size());
assertEquals(1, dataSkewPolicy.getRuntimePasses().size());
}