You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/06/25 03:17:13 UTC
[incubator-nemo] branch master updated: [NEMO-102] Stage
Partitioning by PhysicalPlanGenerator (#51)
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new add2b55 [NEMO-102] Stage Partitioning by PhysicalPlanGenerator (#51)
add2b55 is described below
commit add2b555a57632761fbc5657c1215fd684b37760
Author: Jangho Seo <ja...@jangho.io>
AuthorDate: Mon Jun 25 12:17:10 2018 +0900
[NEMO-102] Stage Partitioning by PhysicalPlanGenerator (#51)
JIRA: [NEMO-102: Stage Partitioning by PhysicalPlanGenerator](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-102)
**Major changes:**
- Removed StageId property
- Replaced DefaultStagePartitioningPass with StagePartitioner in nemo-runtime-common
- Modified PhysicalPlanGenerator to use StagePartitioner
- Added Stage-level property. Common properties which vertices in a stage share become stage-level properties. (Except for the properties ignored by StagePartitioner)
- Ad-hoc properties in Task and Stage, such as containerType, now can be handled by stage-level properties.
- Replaced ScheduleGroupPass with DefaultScheduleGroupPass, which does not require StageId property for assigning ScheduleGroupIndex
**Minor changes to note:**
- Removed StageBuilder and StageEdgeBuilder
- Add a feature to visualizer to display stage-level ExecutionProperties
- Modified visualizer to properly display other ExecutionProperties
- Modified visualizer to properly draw StageEdges so that those edges are not cut by stage boundaries
- Removed parallelism equality checking by DAGBuilder which requires IR-level StageId property (the feature is replaced by the constructor of Stage and sanity checking by PhysicalPlanGenerator)
- Renamed DataStoreProperty to InterTaskDataStoreProperty because it only controls data flow which spans through differnet stages
- Modified ExecutionPropertyMap#toString to emit canonical name of property key
- Added equality test cases to ExecutionPropertyMapTest
- Removed `idToIRVertex` parameter from the constructor of PhysicalPlan. (The parameter value can be inferred from the IR dag supplied to the constructor.)
- Increased the capacity of resources in `beam_sample_executor_resources.json`, because some integration test cases require scheduling a large ScheduleGroup at once. (For example, ScheduleGroup 0 in AlternatingLeastSquareITCase_pado requires 15 slots in Transient resoruce.)
- Modified StageEdge to use consistent naming for executionProperties when emitting json (edgeProperties → executionProperties)
**Tests for the changes:**
- Added StagePartitionerTest to test StagePartitioner under various test scenarios.
- Renamed ScheduleGroupPassTest to DefaultScheduleGroupPassTest and made it tests DefaultScheduleGroupPass under various test scenarios.
- Existing tests should cover changes on PhysicalPlanGenerator.
**Other comments:**
- Legacy ScheduleGroupPass forces stages with SourceVertex within to have ScheduleGroupIndex of zero. Since DefaultScheduleGroupPass does not employ this kind of trick, in FaultToleranceTest ScheduleGroup 0 for `TestPlanGenerator.PlanType.TwoVerticesJoined` is splitted into two ScheduleGroups. That's why I made modification like `if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() == 1) {` in FaultToleranceTest.
resolves [NEMO-102](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-102)
---
bin/json2dot.py | 99 +++----
.../snu/nemo/common/coder/BytesDecoderFactory.java | 2 +-
.../edu/snu/nemo/common/coder/DecoderFactory.java | 1 +
.../src/main/java/edu/snu/nemo/common/dag/DAG.java | 2 +-
.../java/edu/snu/nemo/common/dag/DAGBuilder.java | 18 --
...operty.java => InterTaskDataStoreProperty.java} | 8 +-
.../ir/executionproperty/ExecutionPropertyMap.java | 33 +--
...tageIdProperty.java => SkipSerDesProperty.java} | 20 +-
.../ExecutionPropertyMapTest.java | 34 ++-
.../nemo/compiler/backend/nemo/NemoBackend.java | 3 +-
.../compiletime/annotating/CompressionPass.java | 3 -
.../annotating/DataSkewEdgeDataStorePass.java | 8 +-
.../compiletime/annotating/DecompressionPass.java | 3 -
.../DefaultEdgeUsedDataHandlingPass.java | 11 +-
...ass.java => DefaultInterTaskDataStorePass.java} | 18 +-
.../annotating/DefaultScheduleGroupPass.java | 291 +++++++++++++++++++++
.../annotating/DefaultStagePartitioningPass.java | 144 ----------
.../DisaggregationEdgeDataStorePass.java | 9 +-
.../annotating/PadoEdgeDataStorePass.java | 10 +-
.../annotating/SailfishEdgeDataStorePass.java | 9 +-
.../compiletime/annotating/ScheduleGroupPass.java | 145 ----------
.../composite/PrimitiveCompositePass.java | 7 +-
.../reshaping/SailfishRelayReshapingPass.java | 2 +
.../compiler/optimizer/policy/BasicPullPolicy.java | 6 +-
.../compiler/optimizer/policy/BasicPushPolicy.java | 6 +-
.../policy/DefaultPolicyWithSeparatePass.java | 10 +-
.../compiler/optimizer/policy/PolicyBuilder.java | 4 +-
.../resources/beam_sample_executor_resources.json | 4 +-
.../pass/runtime/DataSkewRuntimePass.java | 2 +-
.../snu/nemo/runtime/common/plan/PhysicalPlan.java | 13 +-
.../runtime/common/plan/PhysicalPlanGenerator.java | 165 +++++-------
.../edu/snu/nemo/runtime/common/plan/Stage.java | 48 ++--
.../snu/nemo/runtime/common/plan/StageBuilder.java | 100 -------
.../snu/nemo/runtime/common/plan/StageEdge.java | 6 +-
.../nemo/runtime/common/plan/StageEdgeBuilder.java | 114 --------
.../nemo/runtime/common/plan/StagePartitioner.java | 125 +++++++++
.../edu/snu/nemo/runtime/common/plan/Task.java | 29 +-
.../runtime/executor/data/BlockManagerWorker.java | 48 ++--
.../runtime/executor/datatransfer/InputReader.java | 11 +-
.../executor/datatransfer/OutputWriter.java | 4 +-
.../executor/datatransfer/DataTransferTest.java | 55 ++--
.../runtime/executor/task/TaskExecutorTest.java | 16 +-
.../master/scheduler/BatchSingleJobScheduler.java | 2 +-
.../ContainerTypeAwareSchedulingPolicy.java | 6 +-
.../master/scheduler/SingleJobTaskCollection.java | 21 +-
.../nemo/runtime/master/JobStateManagerTest.java | 2 +-
.../ContainerTypeAwareSchedulingPolicyTest.java | 6 +-
.../master/scheduler/FaultToleranceTest.java | 35 +--
.../master/scheduler/SingleTaskQueueTest.java | 2 +-
.../runtime/plangenerator/TestPlanGenerator.java | 2 +-
.../runtime/common/plan/StagePartitionerTest.java | 185 +++++++++++++
.../snu/nemo/tests/client/ClientEndpointTest.java | 2 +-
.../annotating/DefaultScheduleGroupPassTest.java | 291 +++++++++++++++++++++
.../annotating/ScheduleGroupPassTest.java | 70 -----
.../composite/DisaggregationPassTest.java | 28 +-
.../composite/PadoCompositePassTest.java | 12 +-
.../compiletime/composite/SailfishPassTest.java | 8 +-
.../optimizer/policy/PolicyBuilderTest.java | 12 +-
.../compiler/optimizer/policy/TestPolicy.java | 3 +-
.../runtime/common/plan/DAGConverterTest.java | 18 +-
60 files changed, 1308 insertions(+), 1043 deletions(-)
diff --git a/bin/json2dot.py b/bin/json2dot.py
index 6f2d339..a0262ef 100755
--- a/bin/json2dot.py
+++ b/bin/json2dot.py
@@ -26,9 +26,8 @@ import re
nextIdx = 0
-def edgePropertiesString(properties):
- prop = {p[0]: p[1] for p in properties.items() if p[0] != 'Coder'}
- return '/'.join(['SideInput' if x[0] == 'IsSideInput' else x[1].split('.')[-1] for x in sorted(prop.items())])
+def propertiesToString(properties):
+ return '<BR/>'.join(['{}={}'.format(re.sub('Property$', '', item[0].split('.')[-1]), item[1]) for item in sorted(properties.items())])
def getIdx():
global nextIdx
@@ -119,10 +118,6 @@ def Vertex(id, properties, state):
except:
pass
try:
- return Stage(id, properties)
- except:
- pass
- try:
return LoopVertex(id, properties)
except:
pass
@@ -138,33 +133,18 @@ class NormalVertex:
def dot(self):
color = 'black'
try:
- if (self.properties['executionProperties']['ExecutorPlacement'] == 'Transient'):
+ placement = self.properties['executionProperties']['edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty']
+ if (placement == 'Transient'):
color = 'orange'
- if (self.properties['executionProperties']['ExecutorPlacement'] == 'Reserved'):
+ if (placement == 'Reserved'):
color = 'green'
except:
pass
label = self.id
if self.state is not None:
- label += '\\n({})'.format(self.state)
- try:
- label += ' (p{})'.format(self.properties['executionProperties']['Parallelism'])
- except:
- pass
- try:
- label += ' (s{})'.format(self.properties['executionProperties']['ScheduleGroupIndex'])
- except:
- pass
+ label += '<BR/>({})'.format(self.state)
try:
- label += '\\n{}'.format(self.properties['source'])
- except:
- pass
- try:
- label += '\\n{}'.format(self.properties['runtimeVertexId'])
- except:
- pass
- try:
- label += '\\n{}'.format(self.properties['index'])
+ label += '<BR/>{}'.format(self.properties['source'])
except:
pass
try:
@@ -174,15 +154,19 @@ class NormalVertex:
class_name = transform[1].split('{')[0].split('.')[-1].split('$')[0].split('@')[0]
except IndexError:
class_name = '?'
- label += '\\n{}:{}'.format(transform_name, class_name)
+ label += '<BR/>{}:{}'.format(transform_name, class_name)
except:
pass
if ('class' in self.properties and self.properties['class'] == 'MetricCollectionBarrierVertex'):
shape = ', shape=box'
- label += '\\nMetricCollectionBarrier'
+ label += '<BR/>MetricCollectionBarrier'
else:
shape = ''
- dot = '{} [label="{}", color={}, style=filled, fillcolor="{}"{}];'.format(self.idx, label, color, stateToColor(self.state), shape)
+ try:
+ label += '<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(propertiesToString(self.properties['executionProperties']))
+ except:
+ pass
+ dot = '{} [label=<{}>, color={}, style=filled, fillcolor="{}"{}];'.format(self.idx, label, color, stateToColor(self.state), shape)
return dot
@property
def oneVertex(self):
@@ -191,27 +175,6 @@ class NormalVertex:
def logicalEnd(self):
return self.idx
-class Stage:
- def __init__(self, id, properties):
- self.id = id
- self.internalDAG = DAG(properties['stageInternalDAG'], JobState.empty())
- self.idx = getIdx()
- @property
- def dot(self):
- dot = ''
- dot += 'subgraph cluster_{} {{'.format(self.idx)
- dot += 'label = "{}";'.format(self.id)
- dot += 'color=blue;'
- dot += self.internalDAG.dot
- dot += '}'
- return dot
- @property
- def oneVertex(self):
- return next(iter(self.internalDAG.vertices.values())).oneVertex
- @property
- def logicalEnd(self):
- return 'cluster_{}'.format(self.idx)
-
class LoopVertex:
def __init__(self, id, properties):
self.id = id
@@ -226,10 +189,10 @@ class LoopVertex:
def dot(self):
label = self.id
try:
- label += ' (p{})'.format(self.executionProperties['Parallelism'])
+ label += '<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(propertiesToString(self.executionProperties))
except:
pass
- label += '\\n(Remaining iteration: {})'.format(self.remaining_iteration)
+ label += '<BR/>(Remaining iteration: {})'.format(self.remaining_iteration)
dot = 'subgraph cluster_{} {{'.format(self.idx)
dot += 'label = "{}";'.format(label)
dot += self.dag.dot
@@ -253,24 +216,30 @@ class LoopVertex:
class Stage:
def __init__(self, id, properties, state):
self.id = id
- self.irVertex = DAG(properties['irDag'], JobState.empty())
+ self.properties = properties
+ self.stageDAG = DAG(properties['irDag'], JobState.empty())
self.idx = getIdx()
self.state = state
+ self.executionProperties = self.properties['executionProperties']
@property
def dot(self):
if self.state.state is None:
state = ''
else:
state = ' ({})'.format(self.state.state)
+ label = '{}{}'.format(self.id, state)
+ if self.state.tasks:
+ label += '<BR/><BR/>{} Task(s):<BR/>{}'.format(len(self.state.tasks), self.state.taskStateSummary)
+ label += '<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(propertiesToString(self.executionProperties))
dot = 'subgraph cluster_{} {{'.format(self.idx)
- dot += 'label = "{}{}\\n\\n{} Task(s):\\n{}";'.format(self.id, state, len(self.state.tasks), self.state.taskStateSummary)
+ dot += 'label = <{}>;'.format(label)
dot += 'color=red; bgcolor="{}";'.format(stateToColor(self.state.state))
- dot += self.irVertex.dot
+ dot += self.stageDAG.dot
dot += '}'
return dot
@property
def oneVertex(self):
- return next(iter(self.irVertex.vertices.values())).oneVertex
+ return next(iter(self.stageDAG.vertices.values())).oneVertex
@property
def logicalEnd(self):
return 'cluster_{}'.format(self.idx)
@@ -305,8 +274,6 @@ class IREdge:
self.dst = dst
self.id = properties['id']
self.executionProperties = properties['executionProperties']
- self.encoderFactory = self.executionProperties['Encoder']
- self.decoderFactory = self.executionProperties['Decoder']
@property
def dot(self):
src = self.src
@@ -319,21 +286,19 @@ class IREdge:
dst = dst.internalDstFor(self.id)
except:
pass
- label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}<BR/>{}</FONT>'.format(self.id, edgePropertiesString(self.executionProperties), self.encoderFactory, self.decoderFactory)
+ label = '{}<BR/><FONT POINT-SIZE=\'8\'>{}</FONT>'.format(self.id, propertiesToString(self.executionProperties))
return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(src.oneVertex.idx,
dst.oneVertex.idx, src.logicalEnd, dst.logicalEnd, label)
class StageEdge:
def __init__(self, src, dst, properties):
- self.src = src.internalDAG.vertices[properties['srcVertex']]
- self.dst = dst.internalDAG.vertices[properties['dstVertex']]
+ self.src = src.stageDAG.vertices[properties['externalSrcVertexId']]
+ self.dst = dst.stageDAG.vertices[properties['externalDstVertexId']]
self.runtimeEdgeId = properties['runtimeEdgeId']
self.executionProperties = properties['executionProperties']
- self.encoderFactory = self.executionProperties['Encoder']
- self.decoderFactory = self.executionProperties['Decoder']
@property
def dot(self):
- label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}<BR/>{}</FONT>'.format(self.runtimeEdgeId, edgePropertiesString(self.executionProperties), self.encoderFactory, self.decoderFactory)
+ label = '{}<BR/><FONT POINT-SIZE=\'8\'>{}</FONT>'.format(self.runtimeEdgeId, propertiesToString(self.executionProperties))
return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(self.src.oneVertex.idx,
self.dst.oneVertex.idx, self.src.logicalEnd, self.dst.logicalEnd, label)
@@ -343,11 +308,9 @@ class RuntimeEdge:
self.dst = dst
self.runtimeEdgeId = properties['runtimeEdgeId']
self.executionProperties = properties['executionProperties']
- self.encoderFactory = self.executionProperties['Encoder']
- self.decoderFactory = self.executionProperties['Decoder']
@property
def dot(self):
- label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}<BR/>{}</FONT>'.format(self.runtimeEdgeId, edgePropertiesString(self.executionProperties), self.encoderFactory, self.decoderFactory)
+ label = '{}<BR/><FONT POINT-SIZE=\'8\'>{}</FONT>'.format(self.runtimeEdgeId, propertiesToString(self.executionProperties))
return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(self.src.oneVertex.idx,
self.dst.oneVertex.idx, self.src.logicalEnd, self.dst.logicalEnd, label)
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/BytesDecoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/BytesDecoderFactory.java
index 069e254..e62e0b8 100644
--- a/common/src/main/java/edu/snu/nemo/common/coder/BytesDecoderFactory.java
+++ b/common/src/main/java/edu/snu/nemo/common/coder/BytesDecoderFactory.java
@@ -75,7 +75,7 @@ public final class BytesDecoderFactory implements DecoderFactory<byte[]> {
final int lengthToRead = byteOutputStream.getCount();
if (lengthToRead == 0) {
- throw new IOException("EoF (empty partition)!"); // TODO #?: use EOF exception instead of IOException.
+ throw new IOException("EoF (empty partition)!"); // TODO #120: use EOF exception instead of IOException.
}
final byte[] resultBytes = new byte[lengthToRead]; // Read the size of this byte array.
System.arraycopy(byteOutputStream.getBufDirectly(), 0, resultBytes, 0, lengthToRead);
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java
index a93d843..dc67ff3 100644
--- a/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java
+++ b/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java
@@ -26,6 +26,7 @@ import java.io.Serializable;
*
* @param <T> element type.
*/
+// TODO #120: Separate EOFException from Decoder Failures
public interface DecoderFactory<T> extends Serializable {
/**
diff --git a/common/src/main/java/edu/snu/nemo/common/dag/DAG.java b/common/src/main/java/edu/snu/nemo/common/dag/DAG.java
index 1366ecc..46198f0 100644
--- a/common/src/main/java/edu/snu/nemo/common/dag/DAG.java
+++ b/common/src/main/java/edu/snu/nemo/common/dag/DAG.java
@@ -204,7 +204,7 @@ public final class DAG<V extends Vertex, E extends Edge<V>> implements Serializa
/**
* Indicates the traversal order of this DAG.
*/
- public enum TraversalOrder {
+ private enum TraversalOrder {
PreOrder,
PostOrder
}
diff --git a/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java b/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
index ed051f7..35c4330 100644
--- a/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
+++ b/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
@@ -23,8 +23,6 @@ import edu.snu.nemo.common.ir.vertex.SourceVertex;
import edu.snu.nemo.common.ir.vertex.LoopVertex;
import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
import edu.snu.nemo.common.exception.IllegalVertexOperationException;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
import java.io.Serializable;
import java.util.*;
@@ -259,22 +257,6 @@ public final class DAGBuilder<V extends Vertex, E extends Edge<V>> implements Se
throw new RuntimeException("DAG execution property check: "
+ "DataSizeMetricCollection edge is not compatible with push" + e.getId());
}));
- // All vertices with same Stage Id should have identical Parallelism execution property.
- final HashMap<Integer, Integer> stageIdToParallelismMap = new HashMap<>();
- vertices.stream().filter(v -> v instanceof IRVertex)
- .map(v -> (IRVertex) v)
- .forEach(v -> {
- final Optional<Integer> stageId = v.getPropertyValue(StageIdProperty.class);
- if (stageId.isPresent()) {
- if (!stageIdToParallelismMap.containsKey(stageId.get())) {
- stageIdToParallelismMap.put(stageId.get(), v.getPropertyValue(ParallelismProperty.class).get());
- } else if (!stageIdToParallelismMap.get(stageId.get())
- .equals(v.getPropertyValue(ParallelismProperty.class).get())) {
- throw new RuntimeException("DAG execution property check: vertices are in a same stage, "
- + "but has different parallelism execution properties: Stage" + stageId.get() + ": " + v.getId());
- }
- }
- });
}
/**
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataStoreProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/InterTaskDataStoreProperty.java
similarity index 81%
rename from common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataStoreProperty.java
rename to common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/InterTaskDataStoreProperty.java
index 6e87844..af6c85c 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataStoreProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/InterTaskDataStoreProperty.java
@@ -20,12 +20,12 @@ import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
/**
* DataStore ExecutionProperty.
*/
-public final class DataStoreProperty extends EdgeExecutionProperty<DataStoreProperty.Value> {
+public final class InterTaskDataStoreProperty extends EdgeExecutionProperty<InterTaskDataStoreProperty.Value> {
/**
* Constructor.
* @param value value of the execution property.
*/
- private DataStoreProperty(final Value value) {
+ private InterTaskDataStoreProperty(final Value value) {
super(value);
}
@@ -34,8 +34,8 @@ public final class DataStoreProperty extends EdgeExecutionProperty<DataStoreProp
* @param value value of the new execution property.
* @return the newly created execution property.
*/
- public static DataStoreProperty of(final Value value) {
- return new DataStoreProperty(value);
+ public static InterTaskDataStoreProperty of(final Value value) {
+ return new InterTaskDataStoreProperty(value);
}
/**
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMap.java b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMap.java
index c4b297b..fdbd5dc 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMap.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMap.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.common.ir.executionproperty;
import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
import edu.snu.nemo.common.ir.edge.executionproperty.PartitionerProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
@@ -25,7 +25,6 @@ import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty
import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import javax.annotation.concurrent.NotThreadSafe;
@@ -33,6 +32,7 @@ import java.io.Serializable;
import java.util.*;
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* ExecutionPropertyMap Class, which uses HashMap for keeping track of ExecutionProperties for vertices and edges.
@@ -67,22 +67,23 @@ public final class ExecutionPropertyMap<T extends ExecutionProperty> implements
switch (commPattern) {
case Shuffle:
map.put(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner));
- map.put(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+ map.put(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore));
break;
case BroadCast:
map.put(PartitionerProperty.of(PartitionerProperty.Value.IntactPartitioner));
- map.put(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+ map.put(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore));
break;
case OneToOne:
map.put(PartitionerProperty.of(PartitionerProperty.Value.IntactPartitioner));
- map.put(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+ map.put(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.MemoryStore));
break;
default:
map.put(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner));
- map.put(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+ map.put(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore));
}
return map;
}
+
/**
* Static initializer for irVertex.
* @param irVertex irVertex to keep the execution property of.
@@ -148,6 +149,13 @@ public final class ExecutionPropertyMap<T extends ExecutionProperty> implements
properties.values().forEach(action);
}
+ /**
+ * @return {@link Stream} of execution properties.
+ */
+ public Stream<T> stream() {
+ return properties.values().stream();
+ }
+
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
@@ -159,7 +167,7 @@ public final class ExecutionPropertyMap<T extends ExecutionProperty> implements
}
isFirstPair = false;
sb.append("\"");
- sb.append(entry.getKey());
+ sb.append(entry.getKey().getCanonicalName());
sb.append("\": \"");
sb.append(entry.getValue().getValue());
sb.append("\"");
@@ -174,17 +182,12 @@ public final class ExecutionPropertyMap<T extends ExecutionProperty> implements
if (this == obj) {
return true;
}
-
if (obj == null || getClass() != obj.getClass()) {
return false;
}
-
- ExecutionPropertyMap that = (ExecutionPropertyMap) obj;
-
- return new EqualsBuilder()
- .append(properties.values().stream().collect(Collectors.toSet()),
- that.properties.values().stream().collect(Collectors.toSet()))
- .isEquals();
+ final ExecutionPropertyMap that = (ExecutionPropertyMap) obj;
+ return properties.values().stream().collect(Collectors.toSet())
+ .equals(that.properties.values().stream().collect(Collectors.toSet()));
}
@Override
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/StageIdProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/SkipSerDesProperty.java
similarity index 61%
rename from common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/StageIdProperty.java
rename to common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/SkipSerDesProperty.java
index 0900992..f9668ed 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/StageIdProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/SkipSerDesProperty.java
@@ -18,23 +18,25 @@ package edu.snu.nemo.common.ir.vertex.executionproperty;
import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
/**
- * StageId ExecutionProperty.
+ * Attaching this property makes runtime to skip serialization and deserialization for the vertex input and output.
+ * TODO #118: Implement Skipping (De)Serialization by ExecutionProperty
*/
-public final class StageIdProperty extends VertexExecutionProperty<Integer> {
+public final class SkipSerDesProperty extends VertexExecutionProperty<Boolean> {
+
+ private static final SkipSerDesProperty SKIP_SER_DES_PROPERTY = new SkipSerDesProperty();
+
/**
* Constructor.
- * @param value value of the execution property.
*/
- private StageIdProperty(final Integer value) {
- super(value);
+ private SkipSerDesProperty() {
+ super(true);
}
/**
* Static method exposing the constructor.
- * @param value value of the new execution property.
- * @return the newly created execution property.
+ * @return the execution property.
*/
- public static StageIdProperty of(final Integer value) {
- return new StageIdProperty(value);
+ public static SkipSerDesProperty of() {
+ return SKIP_SER_DES_PROPERTY;
}
}
diff --git a/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java b/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
index 1ca18ff..ed27235 100644
--- a/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
+++ b/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
/**
* Test {@link ExecutionPropertyMap}.
@@ -57,8 +58,8 @@ public class ExecutionPropertyMapTest {
@Test
public void testPutGetAndRemove() {
- edgeMap.put(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
- assertEquals(DataStoreProperty.Value.MemoryStore, edgeMap.get(DataStoreProperty.class).get());
+ edgeMap.put(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.MemoryStore));
+ assertEquals(InterTaskDataStoreProperty.Value.MemoryStore, edgeMap.get(InterTaskDataStoreProperty.class).get());
edgeMap.put(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
assertEquals(DataFlowModelProperty.Value.Pull, edgeMap.get(DataFlowModelProperty.class).get());
edgeMap.put(EncoderProperty.of(EncoderFactory.DUMMY_ENCODER_FACTORY));
@@ -72,4 +73,33 @@ public class ExecutionPropertyMapTest {
vertexMap.put(ParallelismProperty.of(100));
assertEquals(100, vertexMap.get(ParallelismProperty.class).get().longValue());
}
+
+ @Test
+ public void testEquality() {
+ final ExecutionPropertyMap<ExecutionProperty> map0 = new ExecutionPropertyMap<>("map0");
+ final ExecutionPropertyMap<ExecutionProperty> map1 = new ExecutionPropertyMap<>("map1");
+ assertTrue(map0.equals(map1));
+ assertTrue(map1.equals(map0));
+ map0.put(ParallelismProperty.of(1));
+ assertFalse(map0.equals(map1));
+ assertFalse(map1.equals(map0));
+ map1.put(ParallelismProperty.of(1));
+ assertTrue(map0.equals(map1));
+ assertTrue(map1.equals(map0));
+ map1.put(ParallelismProperty.of(2));
+ assertFalse(map0.equals(map1));
+ assertFalse(map1.equals(map0));
+ map0.put(ParallelismProperty.of(2));
+ assertTrue(map0.equals(map1));
+ assertTrue(map1.equals(map0));
+ map0.put(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
+ assertFalse(map0.equals(map1));
+ assertFalse(map1.equals(map0));
+ map1.put(DataFlowModelProperty.of(DataFlowModelProperty.Value.Push));
+ assertFalse(map0.equals(map1));
+ assertFalse(map1.equals(map0));
+ map1.put(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
+ assertTrue(map0.equals(map1));
+ assertTrue(map1.equals(map0));
+ }
}
diff --git a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
index 71626de..981ee72 100644
--- a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
+++ b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
@@ -58,8 +58,7 @@ public final class NemoBackend implements Backend<PhysicalPlan> {
public PhysicalPlan compile(final DAG<IRVertex, IREdge> irDAG,
final PhysicalPlanGenerator physicalPlanGenerator) {
final DAG<Stage, StageEdge> stageDAG = irDAG.convert(physicalPlanGenerator);
- final PhysicalPlan physicalPlan = new PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(),
- stageDAG, physicalPlanGenerator.getIdToIRVertex());
+ final PhysicalPlan physicalPlan = new PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(), stageDAG);
return physicalPlan;
}
}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java
index 0ffddaa..0c84203 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java
@@ -19,7 +19,6 @@ import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
/**
@@ -48,8 +47,6 @@ public final class CompressionPass extends AnnotatingPass {
@Override
public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
dag.topologicalDo(vertex -> dag.getIncomingEdgesOf(vertex).stream()
- .filter(e -> !vertex.getPropertyValue(StageIdProperty.class).get()
- .equals(e.getSrc().getPropertyValue(StageIdProperty.class).get()))
.filter(edge -> !edge.getPropertyValue(CompressionProperty.class).isPresent())
.forEach(edge -> edge.setProperty(CompressionProperty.of(compression))));
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewEdgeDataStorePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewEdgeDataStorePass.java
index a0428a6..857069b 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewEdgeDataStorePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewEdgeDataStorePass.java
@@ -17,9 +17,9 @@ package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
/**
* Pass to annotate the DAG for a job to perform data skew.
@@ -31,7 +31,7 @@ public final class DataSkewEdgeDataStorePass extends AnnotatingPass {
* Default constructor.
*/
public DataSkewEdgeDataStorePass() {
- super(DataStoreProperty.class);
+ super(InterTaskDataStoreProperty.class);
}
@Override
@@ -45,9 +45,9 @@ public final class DataSkewEdgeDataStorePass extends AnnotatingPass {
dag.getIncomingEdgesOf(v).forEach(edge -> {
// we want it to be in the same stage
if (edge.equals(edgeToUseMemory)) {
- edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+ edge.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.MemoryStore));
} else {
- edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+ edge.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore));
}
});
}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DecompressionPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DecompressionPass.java
index 6ec2fde..6283071 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DecompressionPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DecompressionPass.java
@@ -20,7 +20,6 @@ import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
import edu.snu.nemo.common.ir.edge.executionproperty.DecompressionProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
/**
@@ -39,8 +38,6 @@ public final class DecompressionPass extends AnnotatingPass {
@Override
public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
dag.topologicalDo(vertex -> dag.getIncomingEdgesOf(vertex).stream()
- .filter(e -> !vertex.getPropertyValue(StageIdProperty.class).get()
- .equals(e.getSrc().getPropertyValue(StageIdProperty.class).get()))
// Find edges which have a compression property but not decompression property.
.filter(edge -> edge.getPropertyValue(CompressionProperty.class).isPresent()
&& !edge.getPropertyValue(DecompressionProperty.class).isPresent())
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeUsedDataHandlingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeUsedDataHandlingPass.java
index 81e16f7..0ea2d13 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeUsedDataHandlingPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeUsedDataHandlingPass.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
import edu.snu.nemo.common.ir.edge.executionproperty.UsedDataHandlingProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
@@ -32,7 +32,7 @@ public final class DefaultEdgeUsedDataHandlingPass extends AnnotatingPass {
* Default constructor.
*/
public DefaultEdgeUsedDataHandlingPass() {
- super(UsedDataHandlingProperty.class, Collections.singleton(DataStoreProperty.class));
+ super(UsedDataHandlingProperty.class, Collections.singleton(InterTaskDataStoreProperty.class));
}
@Override
@@ -40,9 +40,10 @@ public final class DefaultEdgeUsedDataHandlingPass extends AnnotatingPass {
dag.topologicalDo(irVertex ->
dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
if (!irEdge.getPropertyValue(UsedDataHandlingProperty.class).isPresent()) {
- final DataStoreProperty.Value dataStoreValue = irEdge.getPropertyValue(DataStoreProperty.class).get();
- if (DataStoreProperty.Value.MemoryStore.equals(dataStoreValue)
- || DataStoreProperty.Value.SerializedMemoryStore.equals(dataStoreValue)) {
+ final InterTaskDataStoreProperty.Value dataStoreValue
+ = irEdge.getPropertyValue(InterTaskDataStoreProperty.class).get();
+ if (InterTaskDataStoreProperty.Value.MemoryStore.equals(dataStoreValue)
+ || InterTaskDataStoreProperty.Value.SerializedMemoryStore.equals(dataStoreValue)) {
irEdge.setProperty(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Discard));
} else {
irEdge.setProperty(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Keep));
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ReviseInterStageEdgeDataStorePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultInterTaskDataStorePass.java
similarity index 61%
rename from compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ReviseInterStageEdgeDataStorePass.java
rename to compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultInterTaskDataStorePass.java
index 35c8c5a..98bca9a 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ReviseInterStageEdgeDataStorePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultInterTaskDataStorePass.java
@@ -17,9 +17,8 @@ package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
import java.util.Collections;
import java.util.List;
@@ -27,12 +26,12 @@ import java.util.List;
/**
* Edge data store pass to process inter-stage memory store edges.
*/
-public final class ReviseInterStageEdgeDataStorePass extends AnnotatingPass {
+public final class DefaultInterTaskDataStorePass extends AnnotatingPass {
/**
* Default constructor.
*/
- public ReviseInterStageEdgeDataStorePass() {
- super(DataStoreProperty.class, Collections.singleton(StageIdProperty.class));
+ public DefaultInterTaskDataStorePass() {
+ super(InterTaskDataStoreProperty.class, Collections.emptySet());
}
@Override
@@ -40,13 +39,8 @@ public final class ReviseInterStageEdgeDataStorePass extends AnnotatingPass {
dag.getVertices().forEach(vertex -> {
final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
if (!inEdges.isEmpty()) {
- inEdges.forEach(edge -> {
- if (DataStoreProperty.Value.MemoryStore.equals(edge.getPropertyValue(DataStoreProperty.class).get())
- && !edge.getSrc().getPropertyValue(StageIdProperty.class).get()
- .equals(edge.getDst().getPropertyValue(StageIdProperty.class).get())) {
- edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
- }
- });
+ inEdges.forEach(edge -> edge.setProperty(
+ InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore)));
}
});
return dag;
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
new file mode 100644
index 0000000..55c16c5
--- /dev/null
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
@@ -0,0 +1,291 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.dag.DAGBuilder;
+import edu.snu.nemo.common.dag.Edge;
+import edu.snu.nemo.common.dag.Vertex;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import org.apache.commons.lang3.mutable.MutableInt;
+
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A pass for assigning each stages in schedule groups.
+ *
+ * <h3>Rules</h3>
+ * <ul>
+ * <li>Vertices connected with push edges must be assigned same ScheduleGroup.</li>
+ * <li>For pull edges,
+ * <ul>
+ * <li>if the destination of the edge depends on multiple ScheduleGroups, split ScheduleGroup by the edge.</li>
+ * <li>if the edge is broadcast type and {@code allowBroadcastWithinScheduleGroup} is {@code false},
+ * split ScheduleGroup by the edge.</li>
+ * <li>if the edge is shuffle type and {@code allowShuffleWithinScheduleGroup} is {@code false},
+ * split ScheduleGroup by the edge.</li>
+ * <li>if the destination of the edge has multiple inEdges, split ScheduleGroup by the edge.</li>
+ * <li>Otherwise, the source and the destination of the edge should be assigned same ScheduleGroup.</li>
+ * </ul>
+ * </li>
+ * </ul>
+ */
+public final class DefaultScheduleGroupPass extends AnnotatingPass {
+
+ private final boolean allowBroadcastWithinScheduleGroup;
+ private final boolean allowShuffleWithinScheduleGroup;
+ private final boolean allowMultipleInEdgesWithinScheduleGroup;
+
+ /**
+ * Default constructor.
+ */
+ public DefaultScheduleGroupPass() {
+ this(false, false, true);
+ }
+
+ /**
+ * Constructor.
+ * @param allowBroadcastWithinScheduleGroup whether to allow Broadcast edges within a ScheduleGroup or not
+ * @param allowShuffleWithinScheduleGroup whether to allow Shuffle edges within a ScheduleGroup or not
+ * @param allowMultipleInEdgesWithinScheduleGroup whether to allow vertices with multiple dependencies or not
+ */
+ public DefaultScheduleGroupPass(final boolean allowBroadcastWithinScheduleGroup,
+ final boolean allowShuffleWithinScheduleGroup,
+ final boolean allowMultipleInEdgesWithinScheduleGroup) {
+ super(ScheduleGroupIndexProperty.class, Stream.of(
+ DataCommunicationPatternProperty.class,
+ DataFlowModelProperty.class
+ ).collect(Collectors.toSet()));
+ this.allowBroadcastWithinScheduleGroup = allowBroadcastWithinScheduleGroup;
+ this.allowShuffleWithinScheduleGroup = allowShuffleWithinScheduleGroup;
+ this.allowMultipleInEdgesWithinScheduleGroup = allowMultipleInEdgesWithinScheduleGroup;
+ }
+
+
+ @Override
+ public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+ final Map<IRVertex, ScheduleGroup> irVertexToScheduleGroupMap = new HashMap<>();
+ final Set<ScheduleGroup> scheduleGroups = new HashSet<>();
+ dag.topologicalDo(irVertex -> {
+ // Base case: for root vertices
+ if (!irVertexToScheduleGroupMap.containsKey(irVertex)) {
+ final ScheduleGroup newScheduleGroup = new ScheduleGroup();
+ scheduleGroups.add(newScheduleGroup);
+ newScheduleGroup.vertices.add(irVertex);
+ irVertexToScheduleGroupMap.put(irVertex, newScheduleGroup);
+ }
+ // Get scheduleGroupIndex
+ final ScheduleGroup scheduleGroup = irVertexToScheduleGroupMap.get(irVertex);
+ if (scheduleGroup == null) {
+ throw new RuntimeException(String.format("ScheduleGroup must be set for %s", irVertex));
+ }
+ // Step case: inductively assign ScheduleGroup
+ for (final IREdge edge : dag.getOutgoingEdgesOf(irVertex)) {
+ final IRVertex connectedIRVertex = edge.getDst();
+ // Skip if some vertices that connectedIRVertex depends on do not have assigned a ScheduleGroup
+ boolean skip = false;
+ for (final IREdge edgeToConnectedIRVertex : dag.getIncomingEdgesOf(connectedIRVertex)) {
+ if (!irVertexToScheduleGroupMap.containsKey(edgeToConnectedIRVertex.getSrc())) {
+ // connectedIRVertex will be covered when edgeToConnectedIRVertex.getSrc() is visited
+ skip = true;
+ break;
+ }
+ }
+ if (skip) {
+ continue;
+ }
+ // Now we can assure that all vertices that connectedIRVertex depends on have assigned a ScheduleGroup
+
+ // Get ScheduleGroup(s) that push data to the connectedIRVertex
+ final Set<ScheduleGroup> pushScheduleGroups = new HashSet<>();
+ for (final IREdge edgeToConnectedIRVertex : dag.getIncomingEdgesOf(connectedIRVertex)) {
+ if (edgeToConnectedIRVertex.getPropertyValue(DataFlowModelProperty.class)
+ .orElseThrow(() -> new RuntimeException(String.format("DataFlowModelProperty for %s must be set",
+ edgeToConnectedIRVertex.getId()))) == DataFlowModelProperty.Value.Push) {
+ pushScheduleGroups.add(irVertexToScheduleGroupMap.get(edgeToConnectedIRVertex.getSrc()));
+ }
+ }
+ if (pushScheduleGroups.isEmpty()) {
+ // If allowMultipleInEdgesWithinScheduleGroup is false and connectedIRVertex depends on multiple vertices,
+ // it should be a member of new ScheduleGroup
+ boolean mergability = allowMultipleInEdgesWithinScheduleGroup
+ || dag.getIncomingEdgesOf(connectedIRVertex).size() <= 1;
+ for (final IREdge edgeToConnectedIRVertex : dag.getIncomingEdgesOf(connectedIRVertex)) {
+ if (!mergability) {
+ break;
+ }
+ final ScheduleGroup anotherDependency = irVertexToScheduleGroupMap.get(edgeToConnectedIRVertex.getSrc());
+ if (!scheduleGroup.equals(anotherDependency)) {
+ // Since connectedIRVertex depends on multiple ScheduleGroups, connectedIRVertex must be a member of
+ // new ScheduleGroup
+ mergability = false;
+ }
+ final DataCommunicationPatternProperty.Value communicationPattern = edgeToConnectedIRVertex
+ .getPropertyValue(DataCommunicationPatternProperty.class).orElseThrow(
+ () -> new RuntimeException(String.format("DataCommunicationPatternProperty for %s must be set",
+ edgeToConnectedIRVertex.getId())));
+ if (!allowBroadcastWithinScheduleGroup
+ && communicationPattern == DataCommunicationPatternProperty.Value.BroadCast) {
+ mergability = false;
+ }
+ if (!allowShuffleWithinScheduleGroup
+ && communicationPattern == DataCommunicationPatternProperty.Value.Shuffle) {
+ mergability = false;
+ }
+ }
+
+ if (mergability) {
+ // Merge into the existing scheduleGroup
+ scheduleGroup.vertices.add(connectedIRVertex);
+ irVertexToScheduleGroupMap.put(connectedIRVertex, scheduleGroup);
+ } else {
+ // Create a new ScheduleGroup
+ final ScheduleGroup newScheduleGroup = new ScheduleGroup();
+ scheduleGroups.add(newScheduleGroup);
+ newScheduleGroup.vertices.add(connectedIRVertex);
+ irVertexToScheduleGroupMap.put(connectedIRVertex, newScheduleGroup);
+ for (final IREdge edgeToConnectedIRVertex : dag.getIncomingEdgesOf(connectedIRVertex)) {
+ final ScheduleGroup src = irVertexToScheduleGroupMap.get(edgeToConnectedIRVertex.getSrc());
+ final ScheduleGroup dst = newScheduleGroup;
+ src.scheduleGroupsTo.add(dst);
+ dst.scheduleGroupsFrom.add(src);
+ }
+ }
+ } else {
+ // If there are multiple ScheduleGroups that push data to connectedIRVertex, merge them
+ final Iterator<ScheduleGroup> pushScheduleGroupIterator = pushScheduleGroups.iterator();
+ final ScheduleGroup pushScheduleGroup = pushScheduleGroupIterator.next();
+ while (pushScheduleGroupIterator.hasNext()) {
+ final ScheduleGroup anotherPushScheduleGroup = pushScheduleGroupIterator.next();
+ anotherPushScheduleGroup.vertices.forEach(pushScheduleGroup.vertices::add);
+ scheduleGroups.remove(anotherPushScheduleGroup);
+ for (final ScheduleGroup src : anotherPushScheduleGroup.scheduleGroupsFrom) {
+ final ScheduleGroup dst = anotherPushScheduleGroup;
+ final ScheduleGroup newDst = pushScheduleGroup;
+ src.scheduleGroupsTo.remove(dst);
+ src.scheduleGroupsTo.add(newDst);
+ newDst.scheduleGroupsFrom.add(src);
+ }
+ for (final ScheduleGroup dst : anotherPushScheduleGroup.scheduleGroupsTo) {
+ final ScheduleGroup src = anotherPushScheduleGroup;
+ final ScheduleGroup newSrc = pushScheduleGroup;
+ dst.scheduleGroupsFrom.remove(src);
+ dst.scheduleGroupsFrom.add(newSrc);
+ newSrc.scheduleGroupsTo.add(dst);
+ }
+ }
+ // Add connectedIRVertex into the merged pushScheduleGroup
+ pushScheduleGroup.vertices.add(connectedIRVertex);
+ irVertexToScheduleGroupMap.put(connectedIRVertex, pushScheduleGroup);
+ }
+ }
+ });
+
+ // Assign ScheduleGroupIndex property based on topology of ScheduleGroups
+ final MutableInt currentScheduleGroupIndex = new MutableInt(getNextScheudleGroupIndex(dag.getVertices()));
+ final DAGBuilder<ScheduleGroup, ScheduleGroupEdge> scheduleGroupDAGBuilder = new DAGBuilder<>();
+ scheduleGroups.forEach(scheduleGroupDAGBuilder::addVertex);
+ scheduleGroups.forEach(src -> src.scheduleGroupsTo
+ .forEach(dst -> scheduleGroupDAGBuilder.connectVertices(new ScheduleGroupEdge(src, dst))));
+ scheduleGroupDAGBuilder.build().topologicalDo(scheduleGroup -> {
+ boolean usedCurrentIndex = false;
+ for (final IRVertex irVertex : scheduleGroup.vertices) {
+ if (!irVertex.getPropertyValue(ScheduleGroupIndexProperty.class).isPresent()) {
+ irVertex.getExecutionProperties().put(ScheduleGroupIndexProperty.of(currentScheduleGroupIndex.getValue()));
+ usedCurrentIndex = true;
+ }
+ }
+ if (usedCurrentIndex) {
+ currentScheduleGroupIndex.increment();
+ }
+ });
+ return dag;
+ }
+
+ /**
+ * Determines the range of {@link ScheduleGroupIndexProperty} value that will prevent collision
+ * with the existing {@link ScheduleGroupIndexProperty}.
+ * @param irVertexCollection collection of {@link IRVertex}
+ * @return the minimum value for the {@link ScheduleGroupIndexProperty} that won't collide with the existing values
+ */
+ private int getNextScheudleGroupIndex(final Collection<IRVertex> irVertexCollection) {
+ int nextScheduleGroupIndex = 0;
+ for (final IRVertex irVertex : irVertexCollection) {
+ final Optional<Integer> scheduleGroupIndex = irVertex.getPropertyValue(ScheduleGroupIndexProperty.class);
+ if (scheduleGroupIndex.isPresent()) {
+ nextScheduleGroupIndex = Math.max(scheduleGroupIndex.get() + 1, nextScheduleGroupIndex);
+ }
+ }
+ return nextScheduleGroupIndex;
+ }
+
+ /**
+ * Vertex in ScheduleGroup DAG.
+ */
+ private static final class ScheduleGroup extends Vertex {
+ private static int nextScheduleGroupId = 0;
+ private final Set<IRVertex> vertices = new HashSet<>();
+ private final Set<ScheduleGroup> scheduleGroupsTo = new HashSet<>();
+ private final Set<ScheduleGroup> scheduleGroupsFrom = new HashSet<>();
+
+ /**
+ * Constructor.
+ */
+ ScheduleGroup() {
+ super(String.format("ScheduleGroup-%d", nextScheduleGroupId++));
+ }
+ }
+
+ /**
+ * Edge in ScheduleGroup DAG.
+ */
+ private static final class ScheduleGroupEdge extends Edge<ScheduleGroup> {
+ private static int nextScheduleGroupEdgeId = 0;
+
+ /**
+ * Constructor.
+ *
+ * @param src source vertex.
+ * @param dst destination vertex.
+ */
+ ScheduleGroupEdge(final ScheduleGroup src, final ScheduleGroup dst) {
+ super(String.format("ScheduleGroupEdge-%d", nextScheduleGroupEdgeId++), src, dst);
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final ScheduleGroupEdge that = (ScheduleGroupEdge) o;
+ return this.getSrc().equals(that.getSrc()) && this.getDst().equals(that.getDst());
+ }
+
+ @Override
+ public int hashCode() {
+ return getSrc().hashCode() + 31 * getDst().hashCode();
+ }
+ }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultStagePartitioningPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultStagePartitioningPass.java
deleted file mode 100644
index 7638e5d..0000000
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultStagePartitioningPass.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.PartitionerProperty;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/**
- * Default method of partitioning an IR DAG into stages.
- * We traverse the DAG topologically to observe each vertex if it can be added to a stage or if it should be assigned
- * to a new stage. We filter out the candidate incoming edges to connect to an existing stage, and if it exists, we
- * connect it to the stage, and otherwise we don't.
- */
-public final class DefaultStagePartitioningPass extends AnnotatingPass {
- /**
- * Default constructor.
- */
- public DefaultStagePartitioningPass() {
- super(StageIdProperty.class, Stream.of(
- DataCommunicationPatternProperty.class,
- ExecutorPlacementProperty.class,
- DataFlowModelProperty.class,
- PartitionerProperty.class,
- ParallelismProperty.class
- ).collect(Collectors.toSet()));
- }
-
- @Override
- public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> irDAG) {
- final AtomicInteger stageNum = new AtomicInteger(0);
- final List<List<IRVertex>> vertexListForEachStage = groupVerticesByStage(irDAG);
- vertexListForEachStage.forEach(stageVertices -> {
- stageVertices.forEach(irVertex -> irVertex.setProperty(StageIdProperty.of(stageNum.get())));
- stageNum.getAndIncrement();
- });
- return irDAG;
- }
-
- /**
- * This method traverses the IR DAG to group each of the vertices by stages.
- * @param irDAG to traverse.
- * @return List of groups of vertices that are each divided by stages.
- */
- private List<List<IRVertex>> groupVerticesByStage(final DAG<IRVertex, IREdge> irDAG) {
- // Data structures used for stage partitioning.
- final HashMap<IRVertex, Integer> vertexStageNumHashMap = new HashMap<>();
- final List<List<IRVertex>> vertexListForEachStage = new ArrayList<>();
- final AtomicInteger stageNumber = new AtomicInteger(1);
-
- // First, traverse the DAG topologically to add each vertices to a list associated with each of the stage number.
- irDAG.topologicalDo(vertex -> {
- final List<IREdge> inEdges = irDAG.getIncomingEdgesOf(vertex);
- final Optional<List<IREdge>> inEdgeList = (inEdges == null || inEdges.isEmpty())
- ? Optional.empty() : Optional.of(inEdges);
-
- if (!inEdgeList.isPresent()) { // If Source vertex
- createNewStage(vertex, vertexStageNumHashMap, stageNumber, vertexListForEachStage);
- } else {
- // Filter candidate incoming edges that can be included in a stage with the vertex.
- final Optional<List<IREdge>> inEdgesForStage = inEdgeList.map(e -> e.stream()
- // One to one edges
- .filter(edge -> edge.getPropertyValue(DataCommunicationPatternProperty.class).get()
- .equals(DataCommunicationPatternProperty.Value.OneToOne))
- // MemoryStore placement
- .filter(edge -> edge.getPropertyValue(DataStoreProperty.class).get()
- .equals(DataStoreProperty.Value.MemoryStore))
- // if src and dst are placed on same container types
- .filter(edge -> edge.getSrc().getPropertyValue(ExecutorPlacementProperty.class).get()
- .equals(edge.getDst().getPropertyValue(ExecutorPlacementProperty.class).get()))
- // if src and dst have same parallelism
- .filter(edge -> edge.getSrc().getPropertyValue(ParallelismProperty.class).get()
- .equals(edge.getDst().getPropertyValue(ParallelismProperty.class).get()))
- // Src that is already included in a stage
- .filter(edge -> vertexStageNumHashMap.containsKey(edge.getSrc()))
- .collect(Collectors.toList()));
- // Choose one to connect out of the candidates. We want to connect the vertex to a single stage.
- final Optional<IREdge> edgeToConnect = inEdgesForStage.map(edges -> edges.stream().findAny())
- .orElse(Optional.empty());
-
- if (!inEdgesForStage.isPresent() || inEdgesForStage.get().isEmpty() || !edgeToConnect.isPresent()) {
- // when we cannot connect vertex in other stages
- createNewStage(vertex, vertexStageNumHashMap, stageNumber, vertexListForEachStage);
- } else {
- // otherwise connect with a stage.
- final IRVertex irVertexToConnect = edgeToConnect.get().getSrc();
- vertexStageNumHashMap.put(vertex, vertexStageNumHashMap.get(irVertexToConnect));
- final Optional<List<IRVertex>> listOfIRVerticesOfTheStage =
- vertexListForEachStage.stream().filter(l -> l.contains(irVertexToConnect)).findFirst();
- listOfIRVerticesOfTheStage.ifPresent(lst -> {
- vertexListForEachStage.remove(lst);
- lst.add(vertex);
- vertexListForEachStage.add(lst);
- });
- }
- }
- });
- return vertexListForEachStage;
- }
-
- /**
- * Creates a new stage.
- * @param irVertex the vertex which begins the stage.
- * @param vertexStageNumHashMap to keep track of vertex and its stage number.
- * @param stageNumber to atomically number stages.
- * @param vertexListForEachStage to group each vertex lists for each stages.
- */
- private static void createNewStage(final IRVertex irVertex, final HashMap<IRVertex, Integer> vertexStageNumHashMap,
- final AtomicInteger stageNumber,
- final List<List<IRVertex>> vertexListForEachStage) {
- vertexStageNumHashMap.put(irVertex, stageNumber.getAndIncrement());
- final List<IRVertex> newList = new ArrayList<>();
- newList.add(irVertex);
- vertexListForEachStage.add(newList);
- }
-}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
index 719de83..76983fd 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
@@ -16,9 +16,9 @@
package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
import java.util.Collections;
import java.util.List;
@@ -32,7 +32,7 @@ public final class DisaggregationEdgeDataStorePass extends AnnotatingPass {
* Default constructor.
*/
public DisaggregationEdgeDataStorePass() {
- super(DataStoreProperty.class, Collections.singleton(DataStoreProperty.class));
+ super(InterTaskDataStoreProperty.class, Collections.singleton(InterTaskDataStoreProperty.class));
}
@Override
@@ -40,10 +40,7 @@ public final class DisaggregationEdgeDataStorePass extends AnnotatingPass {
dag.getVertices().forEach(vertex -> { // Initialize the DataStore of the DAG with GlusterFileStore.
final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
inEdges.forEach(edge -> {
- if (DataStoreProperty.Value.LocalFileStore
- .equals(edge.getPropertyValue(DataStoreProperty.class).get())) {
- edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.GlusterFileStore));
- }
+ edge.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.GlusterFileStore));
});
});
return dag;
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/PadoEdgeDataStorePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/PadoEdgeDataStorePass.java
index 0777c1c..8e4023c 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/PadoEdgeDataStorePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/PadoEdgeDataStorePass.java
@@ -17,9 +17,9 @@ package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
import java.util.Collections;
@@ -33,7 +33,7 @@ public final class PadoEdgeDataStorePass extends AnnotatingPass {
* Default constructor.
*/
public PadoEdgeDataStorePass() {
- super(DataStoreProperty.class, Collections.singleton(ExecutorPlacementProperty.class));
+ super(InterTaskDataStoreProperty.class, Collections.singleton(ExecutorPlacementProperty.class));
}
@Override
@@ -43,12 +43,12 @@ public final class PadoEdgeDataStorePass extends AnnotatingPass {
if (!inEdges.isEmpty()) {
inEdges.forEach(edge -> {
if (fromTransientToReserved(edge) || fromReservedToTransient(edge)) {
- edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+ edge.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore));
} else if (DataCommunicationPatternProperty.Value.OneToOne
.equals(edge.getPropertyValue(DataCommunicationPatternProperty.class).get())) {
- edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+ edge.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.MemoryStore));
} else {
- edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+ edge.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore));
}
});
}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeDataStorePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeDataStorePass.java
index 6fae5ef..fe5d996 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeDataStorePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeDataStorePass.java
@@ -18,8 +18,8 @@ package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
import java.util.Collections;
@@ -32,7 +32,7 @@ public final class SailfishEdgeDataStorePass extends AnnotatingPass {
* Default constructor.
*/
public SailfishEdgeDataStorePass() {
- super(DataStoreProperty.class, Collections.singleton(DataCommunicationPatternProperty.class));
+ super(InterTaskDataStoreProperty.class, Collections.singleton(DataCommunicationPatternProperty.class));
}
@Override
@@ -46,12 +46,13 @@ public final class SailfishEdgeDataStorePass extends AnnotatingPass {
if (DataCommunicationPatternProperty.Value.Shuffle
.equals(edgeToMerger.getPropertyValue(DataCommunicationPatternProperty.class).get())) {
// Pass data through memory to the merger vertex.
- edgeToMerger.setProperty(DataStoreProperty.of(DataStoreProperty.Value.SerializedMemoryStore));
+ edgeToMerger.setProperty(InterTaskDataStoreProperty
+ .of(InterTaskDataStoreProperty.Value.SerializedMemoryStore));
}
});
dag.getOutgoingEdgesOf(vertex).forEach(edgeFromMerger ->
// Merge the input data and write it immediately to the remote disk.
- edgeFromMerger.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore)));
+ edgeFromMerger.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore)));
}
});
return dag;
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ScheduleGroupPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ScheduleGroupPass.java
deleted file mode 100644
index e76f4ba..0000000
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ScheduleGroupPass.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import com.google.common.collect.Lists;
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.PartitionerProperty;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
-
-import java.util.*;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/**
- * A pass for assigning each stages in schedule groups.
- * We traverse the DAG topologically to find the dependency information between stages and number them appropriately
- * to give correct order or schedule groups.
- */
-public final class ScheduleGroupPass extends AnnotatingPass {
- /**
- * Default constructor.
- */
- public ScheduleGroupPass() {
- super(ScheduleGroupIndexProperty.class, Stream.of(
- StageIdProperty.class,
- DataCommunicationPatternProperty.class,
- ExecutorPlacementProperty.class,
- DataFlowModelProperty.class,
- PartitionerProperty.class,
- ParallelismProperty.class
- ).collect(Collectors.toSet()));
- }
-
- private static final int INITIAL_SCHEDULE_GROUP = 0;
-
- @Override
- public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
- // We assume that the input dag is tagged with stage ids.
- if (dag.getVertices().stream()
- .anyMatch(irVertex -> !irVertex.getPropertyValue(StageIdProperty.class).isPresent())) {
- throw new RuntimeException("There exists an IR vertex going through ScheduleGroupPass "
- + "without stage id tagged.");
- }
-
- // Map of stage id to the stage ids that it depends on.
- final Map<Integer, Set<Integer>> dependentStagesMap = new HashMap<>();
- dag.topologicalDo(irVertex -> {
- final Integer currentStageId = irVertex.getPropertyValue(StageIdProperty.class).get();
- dependentStagesMap.putIfAbsent(currentStageId, new HashSet<>());
- // while traversing, we find the stages that point to the current stage and add them to the list.
- dag.getIncomingEdgesOf(irVertex).stream()
- .map(IREdge::getSrc)
- .mapToInt(vertex -> vertex.getPropertyValue(StageIdProperty.class).get())
- .filter(n -> n != currentStageId)
- .forEach(n -> dependentStagesMap.get(currentStageId).add(n));
- });
-
- // Map to put our results in.
- final Map<Integer, Integer> stageIdToScheduleGroupIndexMap = new HashMap<>();
-
- // Calculate schedule group number of each stages step by step
- while (stageIdToScheduleGroupIndexMap.size() < dependentStagesMap.size()) {
- // This is to ensure that each iteration is making progress.
- // We ensure that the stageIdToScheduleGroupIdMap is increasing in size in each iteration.
- final Integer previousSize = stageIdToScheduleGroupIndexMap.size();
- dependentStagesMap.forEach((stageId, dependentStages) -> {
- if (!stageIdToScheduleGroupIndexMap.keySet().contains(stageId)
- && dependentStages.isEmpty()) { // initial source stages
- // initial source stages are indexed with schedule group 0.
- stageIdToScheduleGroupIndexMap.put(stageId, INITIAL_SCHEDULE_GROUP);
- } else if (!stageIdToScheduleGroupIndexMap.keySet().contains(stageId)
- && dependentStages.stream().allMatch(stageIdToScheduleGroupIndexMap::containsKey)) { // next stages
- // We find the maximum schedule group index from previous stages, and index current stage with that number +1.
- final Integer maxDependentSchedulerGroupIndex =
- dependentStages.stream()
- .mapToInt(stageIdToScheduleGroupIndexMap::get)
- .max().orElseThrow(() ->
- new RuntimeException("A stage that is not a source stage much have dependent stages"));
- stageIdToScheduleGroupIndexMap.put(stageId, maxDependentSchedulerGroupIndex + 1);
- }
- });
- if (previousSize == stageIdToScheduleGroupIndexMap.size()) {
- throw new RuntimeException("Iteration for indexing schedule groups in "
- + ScheduleGroupPass.class.getSimpleName() + " is not making progress");
- }
- }
-
- // Reverse topologically traverse and match schedule group ids for those that have push edges in between
- Lists.reverse(dag.getTopologicalSort()).forEach(v -> {
- // get the destination vertices of the edges that are marked as push
- final List<IRVertex> pushConnectedVertices = dag.getOutgoingEdgesOf(v).stream()
- .filter(e -> DataFlowModelProperty.Value.Push.equals(e.getPropertyValue(DataFlowModelProperty.class).get()))
- .map(IREdge::getDst)
- .collect(Collectors.toList());
- if (!pushConnectedVertices.isEmpty()) { // if we need to do something,
- // we find the min value of the destination schedule groups.
- final Integer newSchedulerGroupIndex = pushConnectedVertices.stream()
- .mapToInt(irVertex -> stageIdToScheduleGroupIndexMap
- .get(irVertex.getPropertyValue(StageIdProperty.class).get()))
- .min().orElseThrow(() -> new RuntimeException("a list was not empty, but produced an empty result"));
- // overwrite
- final Integer originalScheduleGroupIndex = stageIdToScheduleGroupIndexMap
- .get(v.getPropertyValue(StageIdProperty.class).get());
- stageIdToScheduleGroupIndexMap.replace(v.getPropertyValue(StageIdProperty.class).get(), newSchedulerGroupIndex);
- // shift those if it came too far
- if (stageIdToScheduleGroupIndexMap.values().stream()
- .noneMatch(stageIndex -> stageIndex.equals(originalScheduleGroupIndex))) { // if it doesn't exist
- stageIdToScheduleGroupIndexMap.replaceAll((stageId, scheduleGroupIndex) -> {
- if (scheduleGroupIndex > originalScheduleGroupIndex) {
- return scheduleGroupIndex - 1; // we shift schedule group indexes by one.
- } else {
- return scheduleGroupIndex;
- }
- });
- }
- }
- });
-
- // do the tagging
- dag.topologicalDo(irVertex -> irVertex.setProperty(ScheduleGroupIndexProperty.of(
- stageIdToScheduleGroupIndexMap.get(irVertex.getPropertyValue(StageIdProperty.class).get()))));
-
- return dag;
- }
-}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
index b0ab2b1..3a5c80c 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
@@ -30,13 +30,12 @@ public final class PrimitiveCompositePass extends CompositePass {
*/
public PrimitiveCompositePass() {
super(Arrays.asList(
- new DefaultParallelismPass(), // annotating after reshaping passes, before stage partitioning
+ new DefaultParallelismPass(),
new DefaultEdgeEncoderPass(),
new DefaultEdgeDecoderPass(),
- new DefaultStagePartitioningPass(),
- new ReviseInterStageEdgeDataStorePass(), // after stage partitioning
+ new DefaultInterTaskDataStorePass(),
new DefaultEdgeUsedDataHandlingPass(),
- new ScheduleGroupPass(),
+ new DefaultScheduleGroupPass(),
new CompressionPass(),
new DecompressionPass()
));
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
index 9bf434a..1abe092 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
@@ -23,6 +23,7 @@ import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternPro
import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.vertex.OperatorVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.SkipSerDesProperty;
import edu.snu.nemo.common.ir.vertex.transform.RelayTransform;
import java.util.Collections;
@@ -58,6 +59,7 @@ public final class SailfishRelayReshapingPass extends ReshapingPass {
// Insert a merger vertex having transform that write received data immediately
// before the vertex receiving shuffled data.
final OperatorVertex iFileMergerVertex = new OperatorVertex(new RelayTransform());
+ iFileMergerVertex.getExecutionProperties().put(SkipSerDesProperty.of());
builder.addVertex(iFileMergerVertex);
final IREdge newEdgeToMerger = new IREdge(DataCommunicationPatternProperty.Value.Shuffle,
edge.getSrc(), iFileMergerVertex, edge.isSideInput());
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
index 124aa16..7fc4fba 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
@@ -16,8 +16,7 @@
package edu.snu.nemo.compiler.optimizer.policy;
import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultStagePartitioningPass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ScheduleGroupPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultScheduleGroupPass;
import edu.snu.nemo.runtime.common.optimizer.pass.runtime.RuntimePass;
import java.util.ArrayList;
@@ -30,8 +29,7 @@ public final class BasicPullPolicy implements Policy {
@Override
public List<CompileTimePass> getCompileTimePasses() {
List<CompileTimePass> policy = new ArrayList<>();
- policy.add(new DefaultStagePartitioningPass());
- policy.add(new ScheduleGroupPass());
+ policy.add(new DefaultScheduleGroupPass());
return policy;
}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
index 595079e..53ff914 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
@@ -16,8 +16,7 @@
package edu.snu.nemo.compiler.optimizer.policy;
import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultStagePartitioningPass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ScheduleGroupPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultScheduleGroupPass;
import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ShuffleEdgePushPass;
import edu.snu.nemo.runtime.common.optimizer.pass.runtime.RuntimePass;
@@ -31,9 +30,8 @@ public final class BasicPushPolicy implements Policy {
@Override
public List<CompileTimePass> getCompileTimePasses() {
List<CompileTimePass> policy = new ArrayList<>();
- policy.add(new DefaultStagePartitioningPass());
policy.add(new ShuffleEdgePushPass());
- policy.add(new ScheduleGroupPass());
+ policy.add(new DefaultScheduleGroupPass());
return policy;
}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicyWithSeparatePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicyWithSeparatePass.java
index 62193a6..d23d5bd 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicyWithSeparatePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicyWithSeparatePass.java
@@ -16,9 +16,8 @@
package edu.snu.nemo.compiler.optimizer.policy;
import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultParallelismPass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultStagePartitioningPass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ReviseInterStageEdgeDataStorePass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ScheduleGroupPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultInterTaskDataStorePass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultScheduleGroupPass;
import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
import edu.snu.nemo.compiler.optimizer.pass.compiletime.composite.CompositePass;
import edu.snu.nemo.runtime.common.optimizer.pass.runtime.RuntimePass;
@@ -63,9 +62,8 @@ public final class DefaultPolicyWithSeparatePass implements Policy {
*/
RefactoredPass() {
super(Arrays.asList(
- new DefaultStagePartitioningPass(),
- new ReviseInterStageEdgeDataStorePass(), // after stage partitioning
- new ScheduleGroupPass()
+ new DefaultInterTaskDataStorePass(),
+ new DefaultScheduleGroupPass()
));
}
}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilder.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilder.java
index 7b40556..5268c00 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilder.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilder.java
@@ -18,7 +18,7 @@ package edu.snu.nemo.compiler.optimizer.policy;
import edu.snu.nemo.common.exception.CompileTimeOptimizationException;
import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
import edu.snu.nemo.common.ir.edge.executionproperty.PartitionerProperty;
import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
@@ -67,7 +67,7 @@ public final class PolicyBuilder {
annotatedExecutionProperties.add(ExecutorPlacementProperty.class);
annotatedExecutionProperties.add(ParallelismProperty.class);
annotatedExecutionProperties.add(DataFlowModelProperty.class);
- annotatedExecutionProperties.add(DataStoreProperty.class);
+ annotatedExecutionProperties.add(InterTaskDataStoreProperty.class);
annotatedExecutionProperties.add(PartitionerProperty.class);
}
diff --git a/examples/resources/beam_sample_executor_resources.json b/examples/resources/beam_sample_executor_resources.json
index ced110c..0b40af9 100644
--- a/examples/resources/beam_sample_executor_resources.json
+++ b/examples/resources/beam_sample_executor_resources.json
@@ -2,11 +2,11 @@
{
"type": "Transient",
"memory_mb": 512,
- "capacity": 5
+ "capacity": 15
},
{
"type": "Reserved",
"memory_mb": 512,
- "capacity": 5
+ "capacity": 15
}
]
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
index 8693918..d8999b4 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
@@ -89,7 +89,7 @@ public final class DataSkewRuntimePass implements RuntimePass<Pair<List<String>,
optimizationEdge.setTaskIdxToKeyRange(taskIdxToHashRange);
});
- return new PhysicalPlan(originalPlan.getId(), physicalDAGBuilder.build(), originalPlan.getIdToIRVertex());
+ return new PhysicalPlan(originalPlan.getId(), physicalDAGBuilder.build());
}
/**
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
index fdd7edf..30f7111 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
@@ -19,6 +19,7 @@ import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import java.io.Serializable;
+import java.util.HashMap;
import java.util.Map;
/**
@@ -34,14 +35,18 @@ public final class PhysicalPlan implements Serializable {
*
* @param id ID of the plan.
* @param stageDAG the DAG of stages.
- * @param idToIRVertex map from task to IR vertex.
*/
public PhysicalPlan(final String id,
- final DAG<Stage, StageEdge> stageDAG,
- final Map<String, IRVertex> idToIRVertex) {
+ final DAG<Stage, StageEdge> stageDAG) {
this.id = id;
this.stageDAG = stageDAG;
- this.idToIRVertex = idToIRVertex;
+
+ idToIRVertex = new HashMap<>();
+ for (final Stage stage : stageDAG.getVertices()) {
+ for (final IRVertex irVertex : stage.getIRDAG().getVertices()) {
+ idToIRVertex.put(irVertex.getId(), irVertex);
+ }
+ }
}
/**
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
index a11997e..d679387 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
@@ -18,17 +18,19 @@ package edu.snu.nemo.runtime.common.plan;
import edu.snu.nemo.common.ir.Readable;
import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty;
import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyValue;
+import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
import edu.snu.nemo.common.ir.vertex.*;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty;
import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
import edu.snu.nemo.conf.JobConf;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.dag.DAGBuilder;
import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.exception.IllegalVertexOperationException;
import edu.snu.nemo.common.exception.PhysicalPlanGenerationException;
+import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import org.apache.reef.tang.annotations.Parameter;
import javax.inject.Inject;
@@ -39,18 +41,21 @@ import java.util.function.Function;
* A function that converts an IR DAG to physical DAG.
*/
public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdge>, DAG<Stage, StageEdge>> {
- private final Map<String, IRVertex> idToIRVertex;
private final String dagDirectory;
+ private final StagePartitioner stagePartitioner;
/**
* Private constructor.
*
+ * @param stagePartitioner provides stage partitioning
* @param dagDirectory the directory in which to store DAG data.
*/
@Inject
- private PhysicalPlanGenerator(@Parameter(JobConf.DAGDirectory.class) final String dagDirectory) {
- this.idToIRVertex = new HashMap<>();
+ private PhysicalPlanGenerator(final StagePartitioner stagePartitioner,
+ @Parameter(JobConf.DAGDirectory.class) final String dagDirectory) {
this.dagDirectory = dagDirectory;
+ this.stagePartitioner = stagePartitioner;
+ stagePartitioner.addIgnoredPropertyKey(DynamicOptimizationProperty.class);
}
/**
@@ -64,6 +69,9 @@ public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdg
// first, stage-partition the IR DAG.
final DAG<Stage, StageEdge> dagOfStages = stagePartitionIrDAG(irDAG);
+ // Sanity check
+ dagOfStages.getVertices().forEach(this::integrityCheck);
+
// this is needed because of DuplicateEdgeGroupProperty.
handleDuplicateEdgeGroupProperty(dagOfStages);
@@ -73,10 +81,6 @@ public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdg
return dagOfStages;
}
- public Map<String, IRVertex> getIdToIRVertex() {
- return idToIRVertex;
- }
-
/**
* Convert the edge id of DuplicateEdgeGroupProperty to physical edge id.
*
@@ -115,41 +119,36 @@ public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdg
*/
public DAG<Stage, StageEdge> stagePartitionIrDAG(final DAG<IRVertex, IREdge> irDAG) {
final DAGBuilder<Stage, StageEdge> dagOfStagesBuilder = new DAGBuilder<>();
+ final Set<IREdge> interStageEdges = new HashSet<>();
+ final Map<Integer, Stage> stageIdToStageMap = new HashMap<>();
+ final Map<IRVertex, Integer> vertexToStageIdMap = stagePartitioner.apply(irDAG);
- final Map<Integer, List<IRVertex>> vertexListForEachStage = new LinkedHashMap<>();
+ final Map<Integer, Set<IRVertex>> vertexSetForEachStage = new LinkedHashMap<>();
irDAG.topologicalDo(irVertex -> {
- final Integer stageNum = irVertex.getPropertyValue(StageIdProperty.class).get();
- if (!vertexListForEachStage.containsKey(stageNum)) {
- vertexListForEachStage.put(stageNum, new ArrayList<>());
+ final int stageId = vertexToStageIdMap.get(irVertex);
+ if (!vertexSetForEachStage.containsKey(stageId)) {
+ vertexSetForEachStage.put(stageId, new HashSet<>());
}
- vertexListForEachStage.get(stageNum).add(irVertex);
+ vertexSetForEachStage.get(stageId).add(irVertex);
});
- final Map<IRVertex, Stage> vertexStageMap = new HashMap<>();
-
- for (final List<IRVertex> stageVertices : vertexListForEachStage.values()) {
- integrityCheck(stageVertices);
-
- final Set<IRVertex> currentStageVertices = new HashSet<>();
- final Set<StageEdgeBuilder> currentStageIncomingEdges = new HashSet<>();
+ for (final int stageId : vertexSetForEachStage.keySet()) {
+ final Set<IRVertex> stageVertices = vertexSetForEachStage.get(stageId);
+ final String stageIdentifier = RuntimeIdGenerator.generateStageId(stageId);
+ final ExecutionPropertyMap<VertexExecutionProperty> stageProperties = new ExecutionPropertyMap<>(stageIdentifier);
+ stagePartitioner.getStageProperties(stageVertices.iterator().next()).forEach(stageProperties::put);
+ final int stageParallelism = stageProperties.get(ParallelismProperty.class)
+ .orElseThrow(() -> new RuntimeException("Parallelism property must be set for Stage"));
- // Create a new stage builder.
- final IRVertex irVertexOfNewStage = stageVertices.stream().findAny()
- .orElseThrow(() -> new RuntimeException("Error: List " + stageVertices.getClass() + " is Empty"));
- final StageBuilder stageBuilder = new StageBuilder(
- irVertexOfNewStage.getPropertyValue(StageIdProperty.class).get(),
- irVertexOfNewStage.getPropertyValue(ParallelismProperty.class).get(),
- irVertexOfNewStage.getPropertyValue(ScheduleGroupIndexProperty.class).get(),
- irVertexOfNewStage.getPropertyValue(ExecutorPlacementProperty.class).get());
+ final DAGBuilder<IRVertex, RuntimeEdge<IRVertex>> stageInternalDAGBuilder = new DAGBuilder<>();
- // Prepare useful variables.
- final int stageParallelism = irVertexOfNewStage.getPropertyValue(ParallelismProperty.class).get();
+ // Prepare vertexIdtoReadables
final List<Map<String, Readable>> vertexIdToReadables = new ArrayList<>(stageParallelism);
for (int i = 0; i < stageParallelism; i++) {
vertexIdToReadables.add(new HashMap<>());
}
- // For each vertex in the stage,
+ // For each IRVertex,
for (final IRVertex irVertex : stageVertices) {
// Take care of the readables of a source vertex.
if (irVertex instanceof SourceVertex) {
@@ -159,108 +158,78 @@ public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdg
for (int i = 0; i < stageParallelism; i++) {
vertexIdToReadables.get(i).put(irVertex.getId(), readables.get(i));
}
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new PhysicalPlanGenerationException(e);
}
-
// Clear internal metadata.
sourceVertex.clearInternalStates();
}
// Add vertex to the stage.
- stageBuilder.addVertex(irVertex);
- currentStageVertices.add(irVertex);
+ stageInternalDAGBuilder.addVertex(irVertex);
+ }
+ for (final IRVertex dstVertex : stageVertices) {
// Connect all the incoming edges for the vertex.
- final List<IREdge> inEdges = irDAG.getIncomingEdgesOf(irVertex);
- final Optional<List<IREdge>> inEdgeList = (inEdges == null) ? Optional.empty() : Optional.of(inEdges);
- inEdgeList.ifPresent(edges -> edges.forEach(irEdge -> {
+ irDAG.getIncomingEdgesOf(dstVertex).forEach(irEdge -> {
final IRVertex srcVertex = irEdge.getSrc();
- final IRVertex dstVertex = irEdge.getDst();
- if (srcVertex == null) {
- throw new IllegalVertexOperationException("Unable to locate srcVertex for IREdge " + irEdge);
- } else if (dstVertex == null) {
- throw new IllegalVertexOperationException("Unable to locate dstVertex for IREdge " + irEdge);
- }
-
- // both vertices are in the stage.
- if (currentStageVertices.contains(srcVertex) && currentStageVertices.contains(dstVertex)) {
- stageBuilder.connectInternalVertices(new RuntimeEdge<IRVertex>(
+ // both vertices are in the same stage.
+ if (vertexToStageIdMap.get(srcVertex).equals(vertexToStageIdMap.get(dstVertex))) {
+ stageInternalDAGBuilder.connectVertices(new RuntimeEdge<>(
irEdge.getId(),
irEdge.getExecutionProperties(),
irEdge.getSrc(),
irEdge.getDst(),
irEdge.isSideInput()));
} else { // edge comes from another stage
- final Stage srcStage = vertexStageMap.get(srcVertex);
-
- if (srcStage == null) {
- throw new IllegalVertexOperationException("srcVertex " + srcVertex.getId()
- + " not yet added to the builder");
- }
-
- final StageEdgeBuilder newEdgeBuilder = new StageEdgeBuilder(irEdge.getId())
- .setEdgeProperties(irEdge.getExecutionProperties())
- .setSrcVertex(srcVertex)
- .setDstVertex(dstVertex)
- .setSrcStage(srcStage)
- .setSideInputFlag(irEdge.isSideInput());
- currentStageIncomingEdges.add(newEdgeBuilder);
+ interStageEdges.add(irEdge);
}
- }));
-
- // Track id to irVertex.
- idToIRVertex.put(irVertex.getId(), irVertex);
+ });
}
- stageBuilder.addReadables(vertexIdToReadables);
// If this runtime stage contains at least one vertex, build it!
- if (!stageBuilder.isEmpty()) {
- final Stage currentStage = stageBuilder.build();
- dagOfStagesBuilder.addVertex(currentStage);
-
- // Add this stage as the destination stage for all the incoming edges.
- currentStageIncomingEdges.forEach(stageEdgeBuilder -> {
- stageEdgeBuilder.setDstStage(currentStage);
- final StageEdge stageEdge = stageEdgeBuilder.build();
- dagOfStagesBuilder.connectVertices(stageEdge);
- });
- currentStageIncomingEdges.clear();
+ if (!stageInternalDAGBuilder.isEmpty()) {
+ final DAG<IRVertex, RuntimeEdge<IRVertex>> stageInternalDAG
+ = stageInternalDAGBuilder.buildWithoutSourceSinkCheck();
+ final Stage stage = new Stage(stageIdentifier, stageInternalDAG, stageProperties, vertexIdToReadables);
+ dagOfStagesBuilder.addVertex(stage);
+ stageIdToStageMap.put(stageId, stage);
+ }
+ }
- currentStageVertices.forEach(irVertex -> vertexStageMap.put(irVertex, currentStage));
- currentStageVertices.clear();
+ // Add StageEdges
+ for (final IREdge interStageEdge : interStageEdges) {
+ final Stage srcStage = stageIdToStageMap.get(vertexToStageIdMap.get(interStageEdge.getSrc()));
+ final Stage dstStage = stageIdToStageMap.get(vertexToStageIdMap.get(interStageEdge.getDst()));
+ if (srcStage == null || dstStage == null) {
+ throw new IllegalVertexOperationException(String.format("Stage not added to the builder:%s%s",
+ srcStage == null ? String.format(" source stage for %s", interStageEdge.getSrc()) : "",
+ dstStage == null ? String.format(" destination stage for %s", interStageEdge.getDst()) : ""));
}
+ dagOfStagesBuilder.connectVertices(new StageEdge(interStageEdge.getId(), interStageEdge.getExecutionProperties(),
+ interStageEdge.getSrc(), interStageEdge.getDst(), srcStage, dstStage, interStageEdge.isSideInput()));
}
return dagOfStagesBuilder.build();
}
/**
- * Integrity check for a stage's vertices.
- * @param stageVertices to check for
+ * Integrity check for Stage.
+ * @param stage to check for
*/
- private void integrityCheck(final List<IRVertex> stageVertices) {
- final IRVertex firstVertex = stageVertices.get(0);
- final String placement = firstVertex.getPropertyValue(ExecutorPlacementProperty.class).get();
- final int scheduleGroup = firstVertex.getPropertyValue(ScheduleGroupIndexProperty.class).get();
- final int parallelism = firstVertex.getPropertyValue(ParallelismProperty.class).get();
+ private void integrityCheck(final Stage stage) {
+ stage.getPropertyValue(ParallelismProperty.class)
+ .orElseThrow(() -> new RuntimeException("Parallelism property must be set for Stage"));
+ stage.getPropertyValue(ScheduleGroupIndexProperty.class)
+ .orElseThrow(() -> new RuntimeException("ScheduleGroupIndex property must be set for Stage"));
- stageVertices.forEach(irVertex -> {
+ stage.getIRDAG().getVertices().forEach(irVertex -> {
// Check vertex type.
if (!(irVertex instanceof SourceVertex
|| irVertex instanceof OperatorVertex
|| irVertex instanceof MetricCollectionBarrierVertex)) {
throw new UnsupportedOperationException(irVertex.toString());
}
-
- // Check execution properties.
- if ((placement != null
- && !placement.equals(irVertex.getPropertyValue(ExecutorPlacementProperty.class).get()))
- || scheduleGroup != irVertex.getPropertyValue(ScheduleGroupIndexProperty.class).get()
- || parallelism != irVertex.getPropertyValue(ParallelismProperty.class).get()) {
- throw new RuntimeException("Vertices of the same stage have different execution properties: "
- + irVertex.getId());
- }
});
}
}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
index 1280f5b..ee0a337 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
@@ -18,48 +18,52 @@ package edu.snu.nemo.runtime.common.plan;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.dag.Vertex;
import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import org.apache.commons.lang3.SerializationUtils;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
/**
* Stage.
*/
public final class Stage extends Vertex {
private final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag;
- private final int parallelism;
- private final int scheduleGroupIndex;
- private final String containerType;
private final byte[] serializedIRDag;
+ private final ExecutionPropertyMap<VertexExecutionProperty> executionProperties;
private final List<Map<String, Readable>> vertexIdToReadables;
+ private final int parallelism;
+ private final int scheduleGroupIndex;
/**
* Constructor.
*
* @param stageId ID of the stage.
* @param irDag the DAG of the task in this stage.
- * @param parallelism how many tasks will be executed in this stage.
- * @param scheduleGroupIndex the schedule group index.
- * @param containerType the type of container to execute the task on.
+ * @param executionProperties set of {@link VertexExecutionProperty} for this stage
* @param vertexIdToReadables the list of maps between vertex ID and {@link Readable}.
*/
public Stage(final String stageId,
final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag,
- final int parallelism,
- final int scheduleGroupIndex,
- final String containerType,
+ final ExecutionPropertyMap<VertexExecutionProperty> executionProperties,
final List<Map<String, Readable>> vertexIdToReadables) {
super(stageId);
this.irDag = irDag;
- this.parallelism = parallelism;
- this.scheduleGroupIndex = scheduleGroupIndex;
- this.containerType = containerType;
this.serializedIRDag = SerializationUtils.serialize(irDag);
+ this.executionProperties = executionProperties;
this.vertexIdToReadables = vertexIdToReadables;
+ this.parallelism = executionProperties.get(ParallelismProperty.class)
+ .orElseThrow(() -> new RuntimeException("Parallelism property must be set for Stage"));
+ this.scheduleGroupIndex = executionProperties.get(ScheduleGroupIndexProperty.class)
+ .orElseThrow(() -> new RuntimeException("ScheduleGroupIndex property must be set for Stage"));
}
/**
@@ -95,10 +99,22 @@ public final class Stage extends Vertex {
}
/**
- * @return the type of container to execute the task on.
+ * @return {@link VertexExecutionProperty} map for this stage
+ */
+ public ExecutionPropertyMap<VertexExecutionProperty> getExecutionProperties() {
+ return executionProperties;
+ }
+
+ /**
+ * Get the executionProperty of the IREdge.
+ *
+ * @param <T> Type of the return value.
+ * @param executionPropertyKey key of the execution property.
+ * @return the execution property.
*/
- public String getContainerType() {
- return containerType;
+ public <T extends Serializable> Optional<T> getPropertyValue(
+ final Class<? extends VertexExecutionProperty<T>> executionPropertyKey) {
+ return executionProperties.get(executionPropertyKey);
}
/**
@@ -114,7 +130,7 @@ public final class Stage extends Vertex {
sb.append("{\"scheduleGroupIndex\": ").append(scheduleGroupIndex);
sb.append(", \"irDag\": ").append(irDag);
sb.append(", \"parallelism\": ").append(parallelism);
- sb.append(", \"containerType\": \"").append(containerType).append("\"");
+ sb.append(", \"executionProperties\": ").append(executionProperties);
sb.append('}');
return sb.toString();
}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageBuilder.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageBuilder.java
deleted file mode 100644
index 04475d2..0000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageBuilder.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.plan;
-
-import edu.snu.nemo.common.ir.Readable;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.common.dag.DAGBuilder;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Stage Builder.
- */
-final class StageBuilder {
- private final DAGBuilder<IRVertex, RuntimeEdge<IRVertex>> stageInternalDAGBuilder;
- private final Integer stageId;
- private final int parallelism;
- private final int scheduleGroupIndex;
- private final String containerType;
- private List<Map<String, Readable>> vertexIdToReadables;
-
- /**
- * Builds a {@link Stage}.
- * @param stageId stage ID in numeric form.
- * @param scheduleGroupIndex indicating its scheduling order.
- */
- StageBuilder(final Integer stageId,
- final int parallelism,
- final int scheduleGroupIndex,
- final String containerType) {
- this.stageId = stageId;
- this.parallelism = parallelism;
- this.scheduleGroupIndex = scheduleGroupIndex;
- this.containerType = containerType;
- this.stageInternalDAGBuilder = new DAGBuilder<>();
- this.vertexIdToReadables = new ArrayList<>(1);
- }
-
- /**
- * Adds a {@link IRVertex} to this stage.
- * @param vertex to add.
- * @return the stageBuilder.
- */
- StageBuilder addVertex(final IRVertex vertex) {
- stageInternalDAGBuilder.addVertex(vertex);
- return this; }
-
- /**
- * Connects two {@link IRVertex} in this stage.
- * @param edge the IREdge that connects vertices.
- * @return the stageBuilder.
- */
- StageBuilder connectInternalVertices(final RuntimeEdge<IRVertex> edge) {
- stageInternalDAGBuilder.connectVertices(edge);
- return this;
- }
-
- StageBuilder addReadables(final List<Map<String, Readable>> vertexIdToReadable) {
- this.vertexIdToReadables = vertexIdToReadable;
- return this;
- }
-
- /**
- * @return true if this builder doesn't contain any valid {@link IRVertex}.
- */
- boolean isEmpty() {
- return stageInternalDAGBuilder.isEmpty();
- }
-
- /**
- * Builds and returns the {@link Stage}.
- * @return the runtime stage.
- */
- Stage build() {
- final Stage stage = new Stage(
- RuntimeIdGenerator.generateStageId(stageId),
- stageInternalDAGBuilder.buildWithoutSourceSinkCheck(),
- parallelism,
- scheduleGroupIndex,
- containerType,
- vertexIdToReadables);
- return stage;
- }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
index 608da4d..d2697f4 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
@@ -15,6 +15,7 @@
*/
package edu.snu.nemo.runtime.common.plan;
+import com.google.common.annotations.VisibleForTesting;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
import edu.snu.nemo.runtime.common.data.KeyRange;
@@ -56,7 +57,8 @@ public final class StageEdge extends RuntimeEdge<Stage> {
* @param dstStage destination stage.
* @param isSideInput whether or not the edge is a sideInput edge.
*/
- StageEdge(final String runtimeEdgeId,
+ @VisibleForTesting
+ public StageEdge(final String runtimeEdgeId,
final ExecutionPropertyMap edgeProperties,
final IRVertex srcVertex,
final IRVertex dstVertex,
@@ -91,7 +93,7 @@ public final class StageEdge extends RuntimeEdge<Stage> {
public String propertiesToJSON() {
final StringBuilder sb = new StringBuilder();
sb.append("{\"runtimeEdgeId\": \"").append(getId());
- sb.append("\", \"edgeProperties\": ").append(getExecutionProperties());
+ sb.append("\", \"executionProperties\": ").append(getExecutionProperties());
sb.append(", \"externalSrcVertexId\": \"").append(srcVertex.getId());
sb.append("\", \"externalDstVertexId\": \"").append(dstVertex.getId());
sb.append("\"}");
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java
deleted file mode 100644
index f94feca..0000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.plan;
-
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
-
-/**
- * Stage Edge Builder.
- */
-public final class StageEdgeBuilder {
- private final String stageEdgeId;
- private ExecutionPropertyMap edgeProperties;
- private Stage srcStage;
- private Stage dstStage;
- private IRVertex srcVertex;
- private IRVertex dstVertex;
- private Boolean isSideInput;
-
- /**
- * Represents the edge between vertices in a logical plan.
- *
- * @param irEdgeId id of this edge.
- */
- public StageEdgeBuilder(final String irEdgeId) {
- this.stageEdgeId = irEdgeId;
- }
-
- /**
- * Setter for edge properties.
- *
- * @param ea the edge properties.
- * @return the updated StageEdgeBuilder.
- */
- public StageEdgeBuilder setEdgeProperties(final ExecutionPropertyMap ea) {
- this.edgeProperties = ea;
- return this;
- }
-
- /**
- * Setter for the source stage.
- *
- * @param ss the source stage.
- * @return the updated StageEdgeBuilder.
- */
- public StageEdgeBuilder setSrcStage(final Stage ss) {
- this.srcStage = ss;
- return this;
- }
-
- /**
- * Setter for the destination stage.
- *
- * @param ds the destination stage.
- * @return the updated StageEdgeBuilder.
- */
- public StageEdgeBuilder setDstStage(final Stage ds) {
- this.dstStage = ds;
- return this;
- }
-
- /**
- * Setter for the source vertex.
- *
- * @param sv the source vertex.
- * @return the updated StageEdgeBuilder.
- */
- public StageEdgeBuilder setSrcVertex(final IRVertex sv) {
- this.srcVertex = sv;
- return this;
- }
-
- /**
- * Setter for the destination vertex.
- *
- * @param dv the destination vertex.
- * @return the updated StageEdgeBuilder.
- */
- public StageEdgeBuilder setDstVertex(final IRVertex dv) {
- this.dstVertex = dv;
- return this;
- }
-
- /**
- * Setter for side input flag.
- *
- * @param sideInputFlag the side input flag.
- * @return the updated StageEdgeBuilder.
- */
- public StageEdgeBuilder setSideInputFlag(final Boolean sideInputFlag) {
- this.isSideInput = sideInputFlag;
- return this;
- }
-
- /**
- * @return the built stage edge.
- */
- public StageEdge build() {
- return new StageEdge(stageEdgeId, edgeProperties, srcVertex, dstVertex, srcStage, dstStage, isSideInput);
- }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
new file mode 100644
index 0000000..49553dc
--- /dev/null
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.common.plan;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import net.jcip.annotations.ThreadSafe;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.reef.annotations.audience.DriverSide;
+
+import javax.inject.Inject;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A function that is responsible for stage partitioning on IR DAG.
+ * Each stage becomes maximal set of {@link IRVertex} such that
+ * <ul>
+ * <li>branches and non-OneToOne edges are not allowed within a stage, and</li>
+ * <li>all vertices in a stage should have same {@link VertexExecutionProperty} map,
+ * except for the ignored properties.</li>
+ * </ul>
+ */
+@DriverSide
+@ThreadSafe
+public final class StagePartitioner implements Function<DAG<IRVertex, IREdge>, Map<IRVertex, Integer>> {
+ private final Set<Class<? extends VertexExecutionProperty>> ignoredPropertyKeys = ConcurrentHashMap.newKeySet();
+
+ @Inject
+ private StagePartitioner() {
+ }
+
+ /**
+ * By default, the stage partitioner merges two vertices into one stage if and only if the two vertices have
+ * same set of {@link VertexExecutionProperty}.
+ * Invoking this method will make the stage partitioner ignore a specific property during comparing
+ * the execution property maps.
+ * @param ignoredPropertyKey a property that will be ignored during the stage partitioning.
+ */
+ public void addIgnoredPropertyKey(final Class<? extends VertexExecutionProperty> ignoredPropertyKey) {
+ ignoredPropertyKeys.add(ignoredPropertyKey);
+ }
+
+ /**
+ * @param irDAG IR DAG to perform stage partition on.
+ * @return a map between IR vertex and the corresponding stage id
+ */
+ @Override
+ public Map<IRVertex, Integer> apply(final DAG<IRVertex, IREdge> irDAG) {
+ final MutableInt nextStageIndex = new MutableInt(0);
+ final Map<IRVertex, Integer> vertexToStageIdMap = new HashMap<>();
+ irDAG.topologicalDo(irVertex -> {
+ // Base case: for root vertices
+ if (vertexToStageIdMap.get(irVertex) == null) {
+ vertexToStageIdMap.put(irVertex, nextStageIndex.getValue());
+ nextStageIndex.increment();
+ }
+ // Get stage id of irVertex
+ final int stageId = vertexToStageIdMap.get(irVertex);
+ // Step case: inductively assign stage ids based on mergability with irVertex
+ for (final IREdge edge : irDAG.getOutgoingEdgesOf(irVertex)) {
+ final IRVertex connectedIRVertex = edge.getDst();
+ // Skip if it already has been assigned stageId
+ if (vertexToStageIdMap.containsKey(connectedIRVertex)) {
+ continue;
+ }
+ // Assign stageId
+ if (testMergability(edge, irDAG)) {
+ vertexToStageIdMap.put(connectedIRVertex, stageId);
+ } else {
+ vertexToStageIdMap.put(connectedIRVertex, nextStageIndex.getValue());
+ nextStageIndex.increment();
+ }
+ }
+ });
+ return vertexToStageIdMap;
+ }
+
+ /**
+ * @param edge an {@link IREdge}.
+ * @param dag IR DAG which contains {@code edge}
+ * @return {@code true} if and only if the source and the destination vertex of the edge can be merged into one stage.
+ */
+ private boolean testMergability(final IREdge edge, final DAG<IRVertex, IREdge> dag) {
+ // If the destination vertex has multiple inEdges, return false
+ if (dag.getIncomingEdgesOf(edge.getDst()).size() > 1) {
+ return false;
+ }
+ // If the edge is not OneToOne, return false
+ if (edge.getPropertyValue(DataCommunicationPatternProperty.class).get()
+ != DataCommunicationPatternProperty.Value.OneToOne) {
+ return false;
+ }
+ // Return true if and only if the execution properties of the two vertices are compatible
+ return getStageProperties(edge.getSrc()).equals(getStageProperties(edge.getDst()));
+ }
+
+ /**
+ * @param vertex a vertex in a stage
+ * @return set of stage-level properties for the stage
+ */
+ public Set<VertexExecutionProperty> getStageProperties(final IRVertex vertex) {
+ final Stream<VertexExecutionProperty> stream = vertex.getExecutionProperties().stream();
+ return stream.filter(p -> !ignoredPropertyKeys.contains(p.getClass())).collect(Collectors.toSet());
+ }
+}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
index 7f2a203..21ea23a 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
@@ -16,10 +16,13 @@
package edu.snu.nemo.runtime.common.plan;
import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
/**
* A Task is a self-contained executable that can be executed on a machine.
@@ -30,7 +33,7 @@ public final class Task implements Serializable {
private final List<StageEdge> taskIncomingEdges;
private final List<StageEdge> taskOutgoingEdges;
private final int attemptIdx;
- private final String containerType;
+ private final ExecutionPropertyMap<VertexExecutionProperty> executionProperties;
private final byte[] serializedIRDag;
private final Map<String, Readable> irVertexIdToReadable;
@@ -40,7 +43,7 @@ public final class Task implements Serializable {
* @param jobId the id of the job.
* @param taskId the ID of the task.
* @param attemptIdx the attempt index.
- * @param containerType the type of container to execute the task on.
+ * @param executionProperties {@link VertexExecutionProperty} map for the corresponding stage
* @param serializedIRDag the serialized DAG of the task.
* @param taskIncomingEdges the incoming edges of the task.
* @param taskOutgoingEdges the outgoing edges of the task.
@@ -49,7 +52,7 @@ public final class Task implements Serializable {
public Task(final String jobId,
final String taskId,
final int attemptIdx,
- final String containerType,
+ final ExecutionPropertyMap<VertexExecutionProperty> executionProperties,
final byte[] serializedIRDag,
final List<StageEdge> taskIncomingEdges,
final List<StageEdge> taskOutgoingEdges,
@@ -57,7 +60,7 @@ public final class Task implements Serializable {
this.jobId = jobId;
this.taskId = taskId;
this.attemptIdx = attemptIdx;
- this.containerType = containerType;
+ this.executionProperties = executionProperties;
this.serializedIRDag = serializedIRDag;
this.taskIncomingEdges = taskIncomingEdges;
this.taskOutgoingEdges = taskOutgoingEdges;
@@ -107,10 +110,22 @@ public final class Task implements Serializable {
}
/**
- * @return the type of container to execute the task on.
+ * @return {@link VertexExecutionProperty} map for the corresponding stage
*/
- public String getContainerType() {
- return containerType;
+ public ExecutionPropertyMap<VertexExecutionProperty> getExecutionProperties() {
+ return executionProperties;
+ }
+
+ /**
+ * Get the executionProperty of this task.
+ *
+ * @param <T> Type of the return value.
+ * @param executionPropertyKey key of the execution property.
+ * @return the execution property.
+ */
+ public <T extends Serializable> Optional<T> getPropertyValue(
+ final Class<? extends VertexExecutionProperty<T>> executionPropertyKey) {
+ return executionProperties.get(executionPropertyKey);
}
/**
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
index 86a18cb..52c55fd 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -21,7 +21,7 @@ import edu.snu.nemo.common.exception.BlockFetchException;
import edu.snu.nemo.common.exception.BlockWriteException;
import edu.snu.nemo.common.exception.UnsupportedBlockStoreException;
import edu.snu.nemo.common.exception.UnsupportedExecutionPropertyException;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
import edu.snu.nemo.common.ir.edge.executionproperty.UsedDataHandlingProperty;
import edu.snu.nemo.conf.JobConf;
import edu.snu.nemo.runtime.common.comm.ControlMessage;
@@ -122,7 +122,7 @@ public final class BlockManagerWorker {
* @throws BlockWriteException for any error occurred while trying to create a block.
*/
public Block createBlock(final String blockId,
- final DataStoreProperty.Value blockStore) throws BlockWriteException {
+ final InterTaskDataStoreProperty.Value blockStore) throws BlockWriteException {
final BlockStore store = getBlockStore(blockStore);
return store.createBlock(blockId);
}
@@ -137,7 +137,7 @@ public final class BlockManagerWorker {
*/
private CompletableFuture<DataUtil.IteratorWithNumBytes> retrieveDataFromBlock(
final String blockId,
- final DataStoreProperty.Value blockStore,
+ final InterTaskDataStoreProperty.Value blockStore,
final KeyRange keyRange) {
LOG.info("RetrieveDataFromBlock: {}", blockId);
final BlockStore store = getBlockStore(blockStore);
@@ -188,7 +188,7 @@ public final class BlockManagerWorker {
public CompletableFuture<DataUtil.IteratorWithNumBytes> queryBlock(
final String blockId,
final String runtimeEdgeId,
- final DataStoreProperty.Value blockStore,
+ final InterTaskDataStoreProperty.Value blockStore,
final KeyRange keyRange) {
// Let's see if a remote worker has it
final CompletableFuture<ControlMessage.Message> blockLocationFuture =
@@ -261,7 +261,7 @@ public final class BlockManagerWorker {
* @param usedDataHandling how to handle the used block.
*/
public void writeBlock(final Block block,
- final DataStoreProperty.Value blockStore,
+ final InterTaskDataStoreProperty.Value blockStore,
final boolean reportPartitionSizes,
final Map<Integer, Long> partitionSizeMap,
final String srcIRVertexId,
@@ -289,7 +289,7 @@ public final class BlockManagerWorker {
.setBlockId(blockId)
.setState(ControlMessage.BlockStateFromExecutor.AVAILABLE);
- if (DataStoreProperty.Value.GlusterFileStore.equals(blockStore)) {
+ if (InterTaskDataStoreProperty.Value.GlusterFileStore.equals(blockStore)) {
blockStateChangedMsgBuilder.setLocation(REMOTE_FILE_STORE);
} else {
blockStateChangedMsgBuilder.setLocation(executorId);
@@ -335,7 +335,7 @@ public final class BlockManagerWorker {
* @param blockStore the store which contains the block.
*/
public void removeBlock(final String blockId,
- final DataStoreProperty.Value blockStore) {
+ final InterTaskDataStoreProperty.Value blockStore) {
LOG.info("RemoveBlock: {}", blockId);
final BlockStore store = getBlockStore(blockStore);
final boolean deleted = store.deleteBlock(blockId);
@@ -347,7 +347,7 @@ public final class BlockManagerWorker {
.setBlockId(blockId)
.setState(ControlMessage.BlockStateFromExecutor.NOT_AVAILABLE);
- if (DataStoreProperty.Value.GlusterFileStore.equals(blockStore)) {
+ if (InterTaskDataStoreProperty.Value.GlusterFileStore.equals(blockStore)) {
blockStateChangedMsgBuilder.setLocation(REMOTE_FILE_STORE);
} else {
blockStateChangedMsgBuilder.setLocation(executorId);
@@ -371,7 +371,7 @@ public final class BlockManagerWorker {
* @param blockStore the store which contains the block.
* @param blockId the ID of the block.
*/
- private void handleUsedData(final DataStoreProperty.Value blockStore,
+ private void handleUsedData(final InterTaskDataStoreProperty.Value blockStore,
final String blockId) {
final AtomicInteger remainingExpectedRead = blockToRemainingRead.get(blockId);
if (remainingExpectedRead != null) {
@@ -389,11 +389,11 @@ public final class BlockManagerWorker {
}
/**
- * Gets the {@link BlockStore} from annotated value of {@link DataStoreProperty}.
- * @param blockStore the annotated value of {@link DataStoreProperty}.
+ * Gets the {@link BlockStore} from annotated value of {@link InterTaskDataStoreProperty}.
+ * @param blockStore the annotated value of {@link InterTaskDataStoreProperty}.
* @return the block store.
*/
- private BlockStore getBlockStore(final DataStoreProperty.Value blockStore) {
+ private BlockStore getBlockStore(final InterTaskDataStoreProperty.Value blockStore) {
switch (blockStore) {
case MemoryStore:
return memoryStore;
@@ -420,7 +420,7 @@ public final class BlockManagerWorker {
public void onOutputContext(final ByteOutputContext outputContext) throws InvalidProtocolBufferException {
final ByteTransferContextDescriptor descriptor = ByteTransferContextDescriptor.PARSER
.parseFrom(outputContext.getContextDescriptor());
- final DataStoreProperty.Value blockStore = convertBlockStore(descriptor.getBlockStore());
+ final InterTaskDataStoreProperty.Value blockStore = convertBlockStore(descriptor.getBlockStore());
final String blockId = descriptor.getBlockId();
final KeyRange keyRange = SerializationUtils.deserialize(descriptor.getKeyRange().toByteArray());
@@ -430,8 +430,8 @@ public final class BlockManagerWorker {
try {
final Optional<Block> optionalBlock = getBlockStore(blockStore).readBlock(blockId);
if (optionalBlock.isPresent()) {
- if (DataStoreProperty.Value.LocalFileStore.equals(blockStore)
- || DataStoreProperty.Value.GlusterFileStore.equals(blockStore)) {
+ if (InterTaskDataStoreProperty.Value.LocalFileStore.equals(blockStore)
+ || InterTaskDataStoreProperty.Value.GlusterFileStore.equals(blockStore)) {
final List<FileArea> fileAreas = ((FileBlock) optionalBlock.get()).asFileAreas(keyRange);
for (final FileArea fileArea : fileAreas) {
try (ByteOutputContext.ByteOutputStream os = outputContext.newOutputStream()) {
@@ -475,10 +475,10 @@ public final class BlockManagerWorker {
/**
* Decodes BlockStore property from protocol buffer.
* @param blockStore property from protocol buffer
- * @return the corresponding {@link DataStoreProperty} value
+ * @return the corresponding {@link InterTaskDataStoreProperty} value
*/
private static ControlMessage.BlockStore convertBlockStore(
- final DataStoreProperty.Value blockStore) {
+ final InterTaskDataStoreProperty.Value blockStore) {
switch (blockStore) {
case MemoryStore:
return ControlMessage.BlockStore.MEMORY;
@@ -495,21 +495,21 @@ public final class BlockManagerWorker {
/**
- * Encodes {@link DataStoreProperty} value into protocol buffer property.
- * @param blockStoreType {@link DataStoreProperty} value
+ * Encodes {@link InterTaskDataStoreProperty} value into protocol buffer property.
+ * @param blockStoreType {@link InterTaskDataStoreProperty} value
* @return the corresponding {@link ControlMessage.BlockStore} value
*/
- private static DataStoreProperty.Value convertBlockStore(
+ private static InterTaskDataStoreProperty.Value convertBlockStore(
final ControlMessage.BlockStore blockStoreType) {
switch (blockStoreType) {
case MEMORY:
- return DataStoreProperty.Value.MemoryStore;
+ return InterTaskDataStoreProperty.Value.MemoryStore;
case SER_MEMORY:
- return DataStoreProperty.Value.SerializedMemoryStore;
+ return InterTaskDataStoreProperty.Value.SerializedMemoryStore;
case LOCAL_FILE:
- return DataStoreProperty.Value.LocalFileStore;
+ return InterTaskDataStoreProperty.Value.LocalFileStore;
case REMOTE_FILE:
- return DataStoreProperty.Value.GlusterFileStore;
+ return InterTaskDataStoreProperty.Value.GlusterFileStore;
default:
throw new UnsupportedBlockStoreException(new Exception("This block store is not yet supported"));
}
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
index fdaecd5..2b29b5f 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.executor.datatransfer;
import com.google.common.annotations.VisibleForTesting;
import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty;
import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyValue;
import edu.snu.nemo.common.ir.vertex.IRVertex;
@@ -87,13 +87,15 @@ public final class InputReader extends DataTransfer {
private CompletableFuture<DataUtil.IteratorWithNumBytes> readOneToOne() {
final String blockId = getBlockId(dstTaskIndex);
- final Optional<DataStoreProperty.Value> dataStoreProperty = runtimeEdge.getPropertyValue(DataStoreProperty.class);
+ final Optional<InterTaskDataStoreProperty.Value> dataStoreProperty
+ = runtimeEdge.getPropertyValue(InterTaskDataStoreProperty.class);
return blockManagerWorker.queryBlock(blockId, getId(), dataStoreProperty.get(), HashRange.all());
}
private List<CompletableFuture<DataUtil.IteratorWithNumBytes>> readBroadcast() {
final int numSrcTasks = this.getSourceParallelism();
- final Optional<DataStoreProperty.Value> dataStoreProperty = runtimeEdge.getPropertyValue(DataStoreProperty.class);
+ final Optional<InterTaskDataStoreProperty.Value> dataStoreProperty
+ = runtimeEdge.getPropertyValue(InterTaskDataStoreProperty.class);
final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = new ArrayList<>();
for (int srcTaskIdx = 0; srcTaskIdx < numSrcTasks; srcTaskIdx++) {
@@ -111,7 +113,8 @@ public final class InputReader extends DataTransfer {
*/
private List<CompletableFuture<DataUtil.IteratorWithNumBytes>> readDataInRange() {
assert (runtimeEdge instanceof StageEdge);
- final Optional<DataStoreProperty.Value> dataStoreProperty = runtimeEdge.getPropertyValue(DataStoreProperty.class);
+ final Optional<InterTaskDataStoreProperty.Value> dataStoreProperty
+ = runtimeEdge.getPropertyValue(InterTaskDataStoreProperty.class);
final KeyRange hashRangeToRead =
((StageEdge) runtimeEdge).getTaskIdxToKeyRange().get(dstTaskIndex);
if (hashRangeToRead == null) {
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
index b0dd080..bb594f6 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -36,7 +36,7 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable {
private final RuntimeEdge<?> runtimeEdge;
private final String srcVertexId;
private final IRVertex dstIrVertex;
- private final DataStoreProperty.Value blockStoreValue;
+ private final InterTaskDataStoreProperty.Value blockStoreValue;
private final BlockManagerWorker blockManagerWorker;
private final boolean nonDummyBlock;
private final Block blockToWrite;
@@ -65,7 +65,7 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable {
this.srcVertexId = srcRuntimeVertexId;
this.dstIrVertex = dstIrVertex;
this.blockManagerWorker = blockManagerWorker;
- this.blockStoreValue = runtimeEdge.getPropertyValue(DataStoreProperty.class).get();
+ this.blockStoreValue = runtimeEdge.getPropertyValue(InterTaskDataStoreProperty.class).get();
// Setup partitioner
final int dstParallelism = getDstParallelism();
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index 0b119d8..88dda9d 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -19,9 +19,11 @@ import edu.snu.nemo.common.coder.*;
import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.ir.edge.executionproperty.*;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
import edu.snu.nemo.common.ir.vertex.SourceVertex;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
import edu.snu.nemo.common.test.EmptyComponents;
import edu.snu.nemo.conf.JobConf;
import edu.snu.nemo.common.Pair;
@@ -36,7 +38,7 @@ import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
import edu.snu.nemo.runtime.common.plan.Stage;
-import edu.snu.nemo.runtime.common.plan.StageEdgeBuilder;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
import edu.snu.nemo.runtime.executor.Executor;
import edu.snu.nemo.runtime.executor.MetricManagerWorker;
import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
@@ -91,10 +93,10 @@ import static org.mockito.Mockito.mock;
SourceVertex.class})
public final class DataTransferTest {
private static final String EXECUTOR_ID_PREFIX = "Executor";
- private static final DataStoreProperty.Value MEMORY_STORE = DataStoreProperty.Value.MemoryStore;
- private static final DataStoreProperty.Value SER_MEMORY_STORE = DataStoreProperty.Value.SerializedMemoryStore;
- private static final DataStoreProperty.Value LOCAL_FILE_STORE = DataStoreProperty.Value.LocalFileStore;
- private static final DataStoreProperty.Value REMOTE_FILE_STORE = DataStoreProperty.Value.GlusterFileStore;
+ private static final InterTaskDataStoreProperty.Value MEMORY_STORE = InterTaskDataStoreProperty.Value.MemoryStore;
+ private static final InterTaskDataStoreProperty.Value SER_MEMORY_STORE = InterTaskDataStoreProperty.Value.SerializedMemoryStore;
+ private static final InterTaskDataStoreProperty.Value LOCAL_FILE_STORE = InterTaskDataStoreProperty.Value.LocalFileStore;
+ private static final InterTaskDataStoreProperty.Value REMOTE_FILE_STORE = InterTaskDataStoreProperty.Value.GlusterFileStore;
private static final String TMP_LOCAL_FILE_DIRECTORY = "./tmpLocalFiles";
private static final String TMP_REMOTE_FILE_DIRECTORY = "./tmpRemoteFiles";
private static final int PARALLELISM_TEN = 10;
@@ -295,7 +297,7 @@ public final class DataTransferTest {
private void writeAndRead(final BlockManagerWorker sender,
final BlockManagerWorker receiver,
final DataCommunicationPatternProperty.Value commPattern,
- final DataStoreProperty.Value store) throws RuntimeException {
+ final InterTaskDataStoreProperty.Value store) throws RuntimeException {
final int testIndex = TEST_INDEX.getAndIncrement();
final String edgeId = String.format(EDGE_PREFIX_TEMPLATE, testIndex);
final Pair<IRVertex, IRVertex> verticesPair = setupVertices(edgeId, sender, receiver);
@@ -307,7 +309,7 @@ public final class DataTransferTest {
dummyIREdge.setProperty(KeyExtractorProperty.of((element -> element)));
dummyIREdge.setProperty(DataCommunicationPatternProperty.of(commPattern));
dummyIREdge.setProperty(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner));
- dummyIREdge.setProperty(DataStoreProperty.of(store));
+ dummyIREdge.setProperty(InterTaskDataStoreProperty.of(store));
dummyIREdge.setProperty(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Keep));
dummyIREdge.setProperty(EncoderProperty.of(ENCODER_FACTORY));
dummyIREdge.setProperty(DecoderProperty.of(DECODER_FACTORY));
@@ -318,14 +320,8 @@ public final class DataTransferTest {
final IRVertex dstMockVertex = mock(IRVertex.class);
final Stage srcStage = setupStages("srcStage-" + testIndex);
final Stage dstStage = setupStages("dstStage-" + testIndex);
- dummyEdge = new StageEdgeBuilder(edgeId)
- .setEdgeProperties(edgeProperties)
- .setSrcVertex(srcMockVertex)
- .setDstVertex(dstMockVertex)
- .setSrcStage(srcStage)
- .setDstStage(dstStage)
- .setSideInputFlag(false)
- .build();
+ dummyEdge = new StageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex,
+ srcStage, dstStage, false);
// Initialize states in Master
srcStage.getTaskIds().forEach(srcTaskId -> {
@@ -384,7 +380,7 @@ public final class DataTransferTest {
private void writeAndReadWithDuplicateData(final BlockManagerWorker sender,
final BlockManagerWorker receiver,
final DataCommunicationPatternProperty.Value commPattern,
- final DataStoreProperty.Value store) throws RuntimeException {
+ final InterTaskDataStoreProperty.Value store) throws RuntimeException {
final int testIndex = TEST_INDEX.getAndIncrement();
final int testIndex2 = TEST_INDEX.getAndIncrement();
final String edgeId = String.format(EDGE_PREFIX_TEMPLATE, testIndex);
@@ -405,7 +401,7 @@ public final class DataTransferTest {
= dummyIREdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
duplicateDataProperty.get().setRepresentativeEdgeId(edgeId);
duplicateDataProperty.get().setGroupSize(2);
- dummyIREdge.setProperty(DataStoreProperty.of(store));
+ dummyIREdge.setProperty(InterTaskDataStoreProperty.of(store));
dummyIREdge.setProperty(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Keep));
final RuntimeEdge dummyEdge, dummyEdge2;
final ExecutionPropertyMap edgeProperties = dummyIREdge.getExecutionProperties();
@@ -414,24 +410,12 @@ public final class DataTransferTest {
final IRVertex dstMockVertex = mock(IRVertex.class);
final Stage srcStage = setupStages("srcStage-" + testIndex);
final Stage dstStage = setupStages("dstStage-" + testIndex);
- dummyEdge = new StageEdgeBuilder(edgeId)
- .setEdgeProperties(edgeProperties)
- .setSrcVertex(srcMockVertex)
- .setDstVertex(dstMockVertex)
- .setSrcStage(srcStage)
- .setDstStage(dstStage)
- .setSideInputFlag(false)
- .build();
+ dummyEdge = new StageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex,
+ srcStage, dstStage, false);
final IRVertex dstMockVertex2 = mock(IRVertex.class);
final Stage dstStage2 = setupStages("dstStage-" + testIndex2);
- dummyEdge2 = new StageEdgeBuilder(edgeId2)
- .setEdgeProperties(edgeProperties)
- .setSrcVertex(srcMockVertex)
- .setDstVertex(dstMockVertex2)
- .setSrcStage(srcStage)
- .setDstStage(dstStage2)
- .setSideInputFlag(false)
- .build();
+ dummyEdge2 = new StageEdge(edgeId2, edgeProperties, srcMockVertex, dstMockVertex2,
+ srcStage, dstStage, false);
// Initialize states in Master
srcStage.getTaskIds().forEach(srcTaskId -> {
final String blockId = RuntimeIdGenerator.generateBlockId(
@@ -561,6 +545,9 @@ public final class DataTransferTest {
private Stage setupStages(final String stageId) {
final DAG<IRVertex, RuntimeEdge<IRVertex>> emptyDag = new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>().build();
- return new Stage(stageId, emptyDag, PARALLELISM_TEN, 0, "Not_used", Collections.emptyList());
+ final ExecutionPropertyMap<VertexExecutionProperty> stageExecutionProperty = new ExecutionPropertyMap<>(stageId);
+ stageExecutionProperty.put(ParallelismProperty.of(PARALLELISM_TEN));
+ stageExecutionProperty.put(ScheduleGroupIndexProperty.of(0));
+ return new Stage(stageId, emptyDag, stageExecutionProperty, Collections.emptyList());
}
}
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
index 998f3aa..5e0a267 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -20,10 +20,11 @@ import edu.snu.nemo.common.ir.OutputCollector;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.dag.DAGBuilder;
import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
import edu.snu.nemo.common.ir.vertex.InMemorySourceVertex;
import edu.snu.nemo.common.ir.vertex.OperatorVertex;
import edu.snu.nemo.common.ir.vertex.transform.Transform;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
@@ -67,7 +68,8 @@ import static org.mockito.Mockito.*;
TaskStateManager.class, StageEdge.class})
public final class TaskExecutorTest {
private static final int DATA_SIZE = 100;
- private static final String CONTAINER_TYPE = "CONTAINER_TYPE";
+ private static final ExecutionPropertyMap<VertexExecutionProperty> TASK_EXECUTION_PROPERTY_MAP
+ = new ExecutionPropertyMap<>("TASK_EXECUTION_PROPERTY_MAP");
private static final int SOURCE_PARALLELISM = 5;
private List<Integer> elements;
private Map<String, List> vertexIdToOutputData;
@@ -137,7 +139,7 @@ public final class TaskExecutorTest {
"testSourceVertexDataFetching",
generateTaskId(),
0,
- CONTAINER_TYPE,
+ TASK_EXECUTION_PROPERTY_MAP,
new byte[0],
Collections.emptyList(),
Collections.singletonList(mockStageEdgeFrom(sourceIRVertex)),
@@ -167,7 +169,7 @@ public final class TaskExecutorTest {
"testSourceVertexDataFetching",
generateTaskId(),
0,
- CONTAINER_TYPE,
+ TASK_EXECUTION_PROPERTY_MAP,
new byte[0],
Collections.singletonList(mockStageEdgeTo(vertex)),
Collections.singletonList(mockStageEdgeFrom(vertex)),
@@ -205,7 +207,7 @@ public final class TaskExecutorTest {
"testSourceVertexDataFetching",
generateTaskId(),
0,
- CONTAINER_TYPE,
+ TASK_EXECUTION_PROPERTY_MAP,
new byte[0],
Collections.singletonList(mockStageEdgeTo(operatorIRVertex1)),
Collections.singletonList(mockStageEdgeFrom(operatorIRVertex2)),
@@ -237,7 +239,7 @@ public final class TaskExecutorTest {
"testSourceVertexDataFetching",
generateTaskId(),
0,
- CONTAINER_TYPE,
+ TASK_EXECUTION_PROPERTY_MAP,
new byte[0],
Arrays.asList(mockStageEdgeTo(operatorIRVertex1), mockStageEdgeTo(operatorIRVertex2)),
Collections.singletonList(mockStageEdgeFrom(operatorIRVertex2)),
@@ -260,7 +262,7 @@ public final class TaskExecutorTest {
final boolean isSideInput) {
final String runtimeIREdgeId = "Runtime edge between operator tasks";
ExecutionPropertyMap edgeProperties = new ExecutionPropertyMap(runtimeIREdgeId);
- edgeProperties.put(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+ edgeProperties.put(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.MemoryStore));
return new RuntimeEdge<>(runtimeIREdgeId, edgeProperties, src, dst, isSideInput);
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index 273f108..d06e1b5 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -404,7 +404,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
physicalPlan.getId(),
taskId,
attemptIdx,
- stageToSchedule.getContainerType(),
+ stageToSchedule.getExecutionProperties(),
stageToSchedule.getSerializedIRDAG(),
stageIncomingEdges,
stageOutgoingEdges,
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
index b8f7d74..5f64e9c 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
@@ -44,13 +44,15 @@ public final class ContainerTypeAwareSchedulingPolicy implements SchedulingPolic
public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
final Task task) {
- if (task.getContainerType().equals(ExecutorPlacementProperty.NONE)) {
+ final String executorPlacementPropertyValue = task.getPropertyValue(ExecutorPlacementProperty.class)
+ .orElse(ExecutorPlacementProperty.NONE);
+ if (executorPlacementPropertyValue.equals(ExecutorPlacementProperty.NONE)) {
return executorRepresenterSet;
}
final Set<ExecutorRepresenter> candidateExecutors =
executorRepresenterSet.stream()
- .filter(executor -> executor.getContainerType().equals(task.getContainerType()))
+ .filter(executor -> executor.getContainerType().equals(executorPlacementPropertyValue))
.collect(Collectors.toSet());
return candidateExecutors;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
index b507a0b..ddcfbf5 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
@@ -62,7 +62,7 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
if (taskIdToTask == null) {
final Map<String, Task> taskIdToTaskMap = new HashMap<>();
taskIdToTaskMap.put(task.getTaskId(), task);
- updateSchedulableStages(stageId, task.getContainerType());
+ updateSchedulableStages(stageId);
return taskIdToTaskMap;
} else {
taskIdToTask.put(task.getTaskId(), task);
@@ -102,7 +102,7 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
}
stageIdToPendingTasks.remove(stageId);
stageIdToPendingTasks.forEach((scheduledStageId, tasks) ->
- updateSchedulableStages(scheduledStageId, tasks.values().iterator().next().getContainerType()));
+ updateSchedulableStages(scheduledStageId));
}
return taskToSchedule;
@@ -157,18 +157,14 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
* NOTE: This method provides the "line up" between stages, by assigning priorities,
* serving as the key to the "priority" implementation of this class.
* @param candidateStageId for the stage that can potentially be scheduled.
- * @param candidateStageContainerType for the stage that can potentially be scheduled.
*/
- private synchronized void updateSchedulableStages(
- final String candidateStageId, final String candidateStageContainerType) {
+ private synchronized void updateSchedulableStages(final String candidateStageId) {
final DAG<Stage, StageEdge> jobDAG = physicalPlan.getStageDAG();
- if (isSchedulable(candidateStageId, candidateStageContainerType)) {
+ if (isSchedulable(candidateStageId)) {
// Check for ancestor stages that became schedulable due to candidateStage's absence from the queue.
jobDAG.getAncestors(candidateStageId).forEach(ancestorStage -> {
- // Remove the ancestor stage if it is of the same container type.
- if (schedulableStages.contains(ancestorStage.getId())
- && candidateStageContainerType.equals(ancestorStage.getContainerType())) {
+ if (schedulableStages.contains(ancestorStage.getId())) {
if (!schedulableStages.remove(ancestorStage.getId())) {
throw new RuntimeException(String.format("No such stage: %s", ancestorStage.getId()));
}
@@ -183,16 +179,13 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
/**
* Determines whether the given candidate stage is schedulable immediately or not.
* @param candidateStageId for the stage that can potentially be scheduled.
- * @param candidateStageContainerType for the stage that can potentially be scheduled.
* @return true if schedulable, false otherwise.
*/
- private synchronized boolean isSchedulable(final String candidateStageId, final String candidateStageContainerType) {
+ private synchronized boolean isSchedulable(final String candidateStageId) {
final DAG<Stage, StageEdge> jobDAG = physicalPlan.getStageDAG();
for (final Stage descendantStage : jobDAG.getDescendants(candidateStageId)) {
if (schedulableStages.contains(descendantStage.getId())) {
- if (candidateStageContainerType.equals(descendantStage.getContainerType())) {
- return false;
- }
+ return false;
}
}
return true;
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
index b8953fb..db5b1d4 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
@@ -122,7 +122,7 @@ public final class JobStateManagerTest {
final DAG<IRVertex, IREdge> irDAG = irDAGBuilder.build();
final DAG<Stage, StageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator);
final JobStateManager jobStateManager = new JobStateManager(
- new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getIdToIRVertex()),
+ new PhysicalPlan("TestPlan", physicalDAG),
blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
assertFalse(jobStateManager.checkJobTermination());
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
index 55808c9..7a2d149 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
@@ -49,7 +49,8 @@ public final class ContainerTypeAwareSchedulingPolicyTest {
final ExecutorRepresenter a2 = mockExecutorRepresenter(ExecutorPlacementProperty.NONE);
final Task task1 = mock(Task.class);
- when(task1.getContainerType()).thenReturn(ExecutorPlacementProperty.RESERVED);
+ when(task1.getPropertyValue(ExecutorPlacementProperty.class))
+ .thenReturn(Optional.of(ExecutorPlacementProperty.RESERVED));
final Set<ExecutorRepresenter> executorRepresenterList1 = new HashSet<>(Arrays.asList(a0, a1, a2));
@@ -60,7 +61,8 @@ public final class ContainerTypeAwareSchedulingPolicyTest {
assertEquals(expectedExecutors1, candidateExecutors1);
final Task task2 = mock(Task.class);
- when(task2.getContainerType()).thenReturn(ExecutorPlacementProperty.NONE);
+ when(task2.getPropertyValue(ExecutorPlacementProperty.class))
+ .thenReturn(Optional.of(ExecutorPlacementProperty.NONE));
final Set<ExecutorRepresenter> executorRepresenterList2 = new HashSet<>(Arrays.asList(a0, a1, a2));
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
index bb9fc2c..96c65eb 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
@@ -136,18 +136,18 @@ public final class FaultToleranceTest {
final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
for (final Stage stage : dagOf4Stages) {
- if (stage.getScheduleGroupIndex() == 0) {
+ if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() == 1) {
- // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0.
+ // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0 and 1.
SchedulerTestUtil.mockSchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
executorRegistry, false);
assertTrue(pendingTaskCollection.isEmpty());
stage.getTaskIds().forEach(taskId ->
SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
taskId, TaskState.State.COMPLETE, 1));
- } else if (stage.getScheduleGroupIndex() == 1) {
+ } else if (stage.getScheduleGroupIndex() == 2) {
scheduler.onExecutorRemoved("a3");
- // There are 2 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 1.
+ // There are 2 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 2.
SchedulerTestUtil.mockSchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
executorRegistry, false);
@@ -165,11 +165,14 @@ public final class FaultToleranceTest {
stage.getTaskIds().forEach(taskId ->
SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
taskId, TaskState.State.COMPLETE, 1));
- } else {
- // There are 1 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 2.
+ } else if (stage.getScheduleGroupIndex() == 3) {
+ // There are 1 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 3.
// Schedule only the first Task
SchedulerTestUtil.mockSchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
executorRegistry, true);
+ } else {
+ throw new RuntimeException(String.format("Unexpected ScheduleGroupIndex: %d",
+ stage.getScheduleGroupIndex()));
}
}
}
@@ -207,17 +210,17 @@ public final class FaultToleranceTest {
final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
for (final Stage stage : dagOf4Stages) {
- if (stage.getScheduleGroupIndex() == 0) {
+ if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() == 1) {
- // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0.
+ // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0 and 1.
SchedulerTestUtil.mockSchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
executorRegistry, false);
assertTrue(pendingTaskCollection.isEmpty());
stage.getTaskIds().forEach(taskId ->
SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
taskId, TaskState.State.COMPLETE, 1));
- } else if (stage.getScheduleGroupIndex() == 1) {
- // There are 3 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 1.
+ } else if (stage.getScheduleGroupIndex() == 2) {
+ // There are 3 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 2.
SchedulerTestUtil.mockSchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
executorRegistry, false);
assertTrue(pendingTaskCollection.isEmpty());
@@ -230,7 +233,7 @@ public final class FaultToleranceTest {
}
- assertEquals(jobStateManager.getAttemptCountForStage(stage.getId()), 3);
+ assertEquals(3, jobStateManager.getAttemptCountForStage(stage.getId()));
assertFalse(pendingTaskCollection.isEmpty());
stage.getTaskIds().forEach(taskId -> {
assertEquals(jobStateManager.getTaskState(taskId).getStateMachine().getCurrentState(),
@@ -273,17 +276,17 @@ public final class FaultToleranceTest {
final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
for (final Stage stage : dagOf4Stages) {
- if (stage.getScheduleGroupIndex() == 0) {
+ if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() == 1) {
- // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0.
+ // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0 and 1.
SchedulerTestUtil.mockSchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
executorRegistry, false);
assertTrue(pendingTaskCollection.isEmpty());
stage.getTaskIds().forEach(taskId ->
SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
taskId, TaskState.State.COMPLETE, 1));
- } else if (stage.getScheduleGroupIndex() == 1) {
- // There are 3 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 1.
+ } else if (stage.getScheduleGroupIndex() == 2) {
+ // There are 3 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 2.
SchedulerTestUtil.mockSchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
executorRegistry, false);
@@ -296,7 +299,7 @@ public final class FaultToleranceTest {
}
- assertEquals(jobStateManager.getAttemptCountForStage(stage.getId()), 2);
+ assertEquals(2, jobStateManager.getAttemptCountForStage(stage.getId()));
stage.getTaskIds().forEach(taskId -> {
assertEquals(jobStateManager.getTaskState(taskId).getStateMachine().getCurrentState(),
TaskState.State.READY);
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
index 23c5ee7..bb48746 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
@@ -217,7 +217,7 @@ public final class SingleTaskQueueTest {
"TestPlan",
taskId,
0,
- stage.getContainerType(),
+ stage.getExecutionProperties(),
stage.getSerializedIRDAG(),
Collections.emptyList(),
Collections.emptyList(),
diff --git a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
index 65f10b2..0af8c8d 100644
--- a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
+++ b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
@@ -100,7 +100,7 @@ public final class TestPlanGenerator {
final Policy policy) throws Exception {
final DAG<IRVertex, IREdge> optimized = CompiletimeOptimizer.optimize(irDAG, policy, EMPTY_DAG_DIRECTORY);
final DAG<Stage, StageEdge> physicalDAG = optimized.convert(PLAN_GENERATOR);
- return new PhysicalPlan("TestPlan", physicalDAG, PLAN_GENERATOR.getIdToIRVertex());
+ return new PhysicalPlan("TestPlan", physicalDAG);
}
/**
diff --git a/tests/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java b/tests/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java
new file mode 100644
index 0000000..a60cf88
--- /dev/null
+++ b/tests/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.common.plan;
+
+import edu.snu.nemo.common.dag.DAGBuilder;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.OperatorVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import edu.snu.nemo.common.ir.vertex.transform.Transform;
+import edu.snu.nemo.common.test.EmptyComponents;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Tests {@link StagePartitioner}.
+ */
+public final class StagePartitionerTest {
+ private static final Transform EMPTY_TRANSFORM = new EmptyComponents.EmptyTransform("empty");
+
+ private StagePartitioner stagePartitioner;
+
+ @Before
+ public void setup() throws InjectionException {
+ stagePartitioner = Tang.Factory.getTang().newInjector().getInstance(StagePartitioner.class);
+ stagePartitioner.addIgnoredPropertyKey(DynamicOptimizationProperty.class);
+ }
+
+ /**
+ * @param parallelism {@link ParallelismProperty} value for the new vertex
+ * @param scheduleGroupIndex {@link ScheduleGroupIndexProperty} value for the new vertex
+ * @param otherProperties other {@link VertexExecutionProperty} for the new vertex
+ * @return new {@link IRVertex}
+ */
+ private static IRVertex newVertex(final int parallelism, final int scheduleGroupIndex,
+ final List<VertexExecutionProperty> otherProperties) {
+ final IRVertex vertex = new OperatorVertex(EMPTY_TRANSFORM);
+ vertex.getExecutionProperties().put(ParallelismProperty.of(parallelism));
+ vertex.getExecutionProperties().put(ScheduleGroupIndexProperty.of(scheduleGroupIndex));
+ otherProperties.forEach(property -> vertex.getExecutionProperties().put(property));
+ return vertex;
+ }
+
+ /**
+ * A simple case where two vertices have common parallelism and ScheduleGroupIndex so that get merged into one stage.
+ */
+ @Test
+ public void testLinear() {
+ final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+ final IRVertex v0 = newVertex(5, 0, Collections.emptyList());
+ final IRVertex v1 = newVertex(5, 0, Collections.emptyList());
+ dagBuilder.addVertex(v0);
+ dagBuilder.addVertex(v1);
+ dagBuilder.connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v0, v1));
+ final Map<IRVertex, Integer> partitioning = stagePartitioner.apply(dagBuilder.buildWithoutSourceSinkCheck());
+ assertEquals(0, (int) partitioning.get(v0));
+ assertEquals(0, (int) partitioning.get(v1));
+ }
+
+ /**
+ * A simple case where two vertices have different parallelism.
+ */
+ @Test
+ public void testSplitByParallelism() {
+ final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+ final IRVertex v0 = newVertex(5, 0, Collections.emptyList());
+ final IRVertex v1 = newVertex(1, 0, Collections.emptyList());
+ dagBuilder.addVertex(v0);
+ dagBuilder.addVertex(v1);
+ dagBuilder.connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v0, v1));
+ final Map<IRVertex, Integer> partitioning = stagePartitioner.apply(dagBuilder.buildWithoutSourceSinkCheck());
+ assertNotEquals(partitioning.get(v0), partitioning.get(v1));
+ }
+
+ /**
+ * A simple case where two vertices have different ScheduleGroupIndex.
+ */
+ @Test
+ public void testSplitByScheduleGroupIndex() {
+ final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+ final IRVertex v0 = newVertex(1, 0, Collections.emptyList());
+ final IRVertex v1 = newVertex(1, 1, Collections.emptyList());
+ dagBuilder.addVertex(v0);
+ dagBuilder.addVertex(v1);
+ dagBuilder.connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v0, v1));
+ final Map<IRVertex, Integer> partitioning = stagePartitioner.apply(dagBuilder.buildWithoutSourceSinkCheck());
+ assertNotEquals(partitioning.get(v0), partitioning.get(v1));
+ }
+
+ /**
+ * A simple case where two vertices are connected with Shuffle edge.
+ */
+ @Test
+ public void testSplitByShuffle() {
+ final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+ final IRVertex v0 = newVertex(1, 0, Collections.emptyList());
+ final IRVertex v1 = newVertex(1, 0, Collections.emptyList());
+ dagBuilder.addVertex(v0);
+ dagBuilder.addVertex(v1);
+ dagBuilder.connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v0, v1));
+ final Map<IRVertex, Integer> partitioning = stagePartitioner.apply(dagBuilder.buildWithoutSourceSinkCheck());
+ assertNotEquals(partitioning.get(v0), partitioning.get(v1));
+ }
+
+ /**
+ * A simple case where one of the two vertices has additional property.
+ */
+ @Test
+ public void testSplitByOtherProperty() {
+ final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+ final IRVertex v0 = newVertex(1, 0,
+ Arrays.asList(ExecutorPlacementProperty.of(ExecutorPlacementProperty.RESERVED)));
+ final IRVertex v1 = newVertex(1, 0, Collections.emptyList());
+ dagBuilder.addVertex(v0);
+ dagBuilder.addVertex(v1);
+ dagBuilder.connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v0, v1));
+ final Map<IRVertex, Integer> partitioning = stagePartitioner.apply(dagBuilder.buildWithoutSourceSinkCheck());
+ assertNotEquals(partitioning.get(v0), partitioning.get(v1));
+ }
+
+ /**
+ * A simple case where one of the two vertices has ignored property.
+ */
+ @Test
+ public void testNotSplitByIgnoredProperty() {
+ final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+ final IRVertex v0 = newVertex(1, 0,
+ Arrays.asList(DynamicOptimizationProperty.of(DynamicOptimizationProperty.Value.DataSkewRuntimePass)));
+ final IRVertex v1 = newVertex(1, 0, Collections.emptyList());
+ dagBuilder.addVertex(v0);
+ dagBuilder.addVertex(v1);
+ dagBuilder.connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v0, v1));
+ final Map<IRVertex, Integer> partitioning = stagePartitioner.apply(dagBuilder.buildWithoutSourceSinkCheck());
+ assertEquals(0, (int) partitioning.get(v0));
+ assertEquals(0, (int) partitioning.get(v1));
+ }
+
+ /**
+ * Test scenario when there is a join.
+ */
+ @Test
+ public void testJoin() {
+ final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+ final IRVertex v0 = newVertex(5, 0, Collections.emptyList());
+ final IRVertex v1 = newVertex(5, 0, Collections.emptyList());
+ final IRVertex v2 = newVertex(5, 0, Collections.emptyList());
+ dagBuilder.addVertex(v0);
+ dagBuilder.addVertex(v1);
+ dagBuilder.addVertex(v2);
+ dagBuilder.connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v0, v2));
+ dagBuilder.connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v1, v2));
+ final Map<IRVertex, Integer> partitioning = stagePartitioner.apply(dagBuilder.buildWithoutSourceSinkCheck());
+ assertNotEquals(partitioning.get(v0), partitioning.get(v1));
+ assertNotEquals(partitioning.get(v1), partitioning.get(v2));
+ assertNotEquals(partitioning.get(v2), partitioning.get(v0));
+ }
+}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java b/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
index 03952fe..0d60a5d 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
@@ -82,7 +82,7 @@ public class ClientEndpointTest {
injector.bindVolatileInstance(MessageEnvironment.class, messageEnvironment);
final BlockManagerMaster pmm = injector.getInstance(BlockManagerMaster.class);
final JobStateManager jobStateManager = new JobStateManager(
- new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getIdToIRVertex()),
+ new PhysicalPlan("TestPlan", physicalDAG),
pmm, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
final DriverEndpoint driverEndpoint = new DriverEndpoint(jobStateManager, clientEndpoint);
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
new file mode 100644
index 0000000..5cf95b5
--- /dev/null
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
@@ -0,0 +1,291 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.tests.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.client.JobLauncher;
+import edu.snu.nemo.common.Pair;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.dag.DAGBuilder;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.OperatorVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import edu.snu.nemo.common.ir.vertex.transform.Transform;
+import edu.snu.nemo.common.test.EmptyComponents;
+import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
+import edu.snu.nemo.tests.compiler.optimizer.policy.TestPolicy;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.AnnotatingPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultScheduleGroupPass;
+import edu.snu.nemo.tests.compiler.CompilerTestUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test {@link DefaultScheduleGroupPass}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JobLauncher.class)
+public final class DefaultScheduleGroupPassTest {
+ private static final Transform EMPTY_TRANSFORM = new EmptyComponents.EmptyTransform("empty");
+
+ @Test
+ public void testAnnotatingPass() {
+ final AnnotatingPass scheduleGroupPass = new DefaultScheduleGroupPass();
+ assertEquals(ScheduleGroupIndexProperty.class, scheduleGroupPass.getExecutionPropertyToModify());
+ }
+
+ /**
+ * This test ensures that a topologically sorted DAG has an increasing sequence of schedule group indexes.
+ */
+ @Test
+ public void testTopologicalOrdering() throws Exception {
+ final DAG<IRVertex, IREdge> compiledDAG = CompilerTestUtil.compileALSDAG();
+ final DAG<IRVertex, IREdge> processedDAG = CompiletimeOptimizer.optimize(compiledDAG,
+ new TestPolicy(), "");
+
+ for (final IRVertex irVertex : processedDAG.getTopologicalSort()) {
+ final Integer currentScheduleGroupIndex = irVertex.getPropertyValue(ScheduleGroupIndexProperty.class).get();
+ final Integer largestScheduleGroupIndexOfParent = processedDAG.getParents(irVertex.getId()).stream()
+ .mapToInt(v -> v.getPropertyValue(ScheduleGroupIndexProperty.class).get())
+ .max().orElse(0);
+ assertTrue(currentScheduleGroupIndex >= largestScheduleGroupIndexOfParent);
+ }
+ }
+
+ /**
+ * Return a DAG that has a branch.
+ * {@literal
+ * /-- v3 --- v4
+ * v0 --- v1 --- v2 --/
+ * }
+ *
+ * @param communicationPattern {@link DataCommunicationPatternProperty.Value} for the edges
+ * @param dataFlowModel {@link DataFlowModelProperty.Value} for the edges
+ * @return a {@link Pair} of {@link DAG} and {@link List} of {@link IRVertex}
+ */
+ private static Pair<DAG<IRVertex, IREdge>, List<IRVertex>> generateBranchDAG(
+ final DataCommunicationPatternProperty.Value communicationPattern,
+ final DataFlowModelProperty.Value dataFlowModel) {
+ final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+
+ final IRVertex v0 = new OperatorVertex(EMPTY_TRANSFORM);
+ final IRVertex v1 = new OperatorVertex(EMPTY_TRANSFORM);
+ final IRVertex v2 = new OperatorVertex(EMPTY_TRANSFORM);
+ final IRVertex v3 = new OperatorVertex(EMPTY_TRANSFORM);
+ final IRVertex v4 = new OperatorVertex(EMPTY_TRANSFORM);
+
+ final IREdge e0 = new IREdge(communicationPattern, v0, v1);
+ final IREdge e1 = new IREdge(communicationPattern, v1, v2);
+ final IREdge e2 = new IREdge(communicationPattern, v1, v3);
+ final IREdge e3 = new IREdge(communicationPattern, v2, v4);
+ final IREdge e4 = new IREdge(communicationPattern, v3, v4);
+
+ final List<IRVertex> vertices = Arrays.asList(v0, v1, v2, v3, v4);
+ for (final IRVertex vertex : vertices) {
+ dagBuilder.addVertex(vertex);
+ }
+ for (final IREdge edge : Arrays.asList(e0, e1, e2, e3, e4)) {
+ edge.getExecutionProperties().put(DataFlowModelProperty.of(dataFlowModel));
+ dagBuilder.connectVertices(edge);
+ }
+ return Pair.of(dagBuilder.buildWithoutSourceSinkCheck(), vertices);
+ }
+
+ /**
+ * Return a DAG that has a join.
+ * {@literal
+ * v0 --- v1 --- v4 -- v5
+ * v2 --- v3 --/
+ * }
+ *
+ * @param communicationPattern {@link DataCommunicationPatternProperty.Value} for the edges
+ * @param dataFlowModel {@link DataFlowModelProperty.Value} for the edges
+ * @return a {@link Pair} of {@link DAG} and {@link List} of {@link IRVertex}
+ */
+ private static Pair<DAG<IRVertex, IREdge>, List<IRVertex>> generateJoinDAG(
+ final DataCommunicationPatternProperty.Value communicationPattern,
+ final DataFlowModelProperty.Value dataFlowModel) {
+ final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+
+ final IRVertex v0 = new OperatorVertex(EMPTY_TRANSFORM);
+ final IRVertex v1 = new OperatorVertex(EMPTY_TRANSFORM);
+ final IRVertex v2 = new OperatorVertex(EMPTY_TRANSFORM);
+ final IRVertex v3 = new OperatorVertex(EMPTY_TRANSFORM);
+ final IRVertex v4 = new OperatorVertex(EMPTY_TRANSFORM);
+ final IRVertex v5 = new OperatorVertex(EMPTY_TRANSFORM);
+
+ final IREdge e0 = new IREdge(communicationPattern, v0, v1);
+ final IREdge e1 = new IREdge(communicationPattern, v2, v3);
+ final IREdge e2 = new IREdge(communicationPattern, v1, v4);
+ final IREdge e3 = new IREdge(communicationPattern, v3, v4);
+ final IREdge e4 = new IREdge(communicationPattern, v4, v5);
+
+ final List<IRVertex> vertices = Arrays.asList(v0, v1, v2, v3, v4, v5);
+ for (final IRVertex vertex : vertices) {
+ dagBuilder.addVertex(vertex);
+ }
+ for (final IREdge edge : Arrays.asList(e0, e1, e2, e3, e4)) {
+ edge.getExecutionProperties().put(DataFlowModelProperty.of(dataFlowModel));
+ dagBuilder.connectVertices(edge);
+ }
+ return Pair.of(dagBuilder.buildWithoutSourceSinkCheck(), vertices);
+ }
+
+ /**
+ * Asserts that the {@link ScheduleGroupIndexProperty} is equal to {@code expected}.
+ * @param expected the expected property value
+ * @param vertex the vertex to test
+ */
+ private static void assertScheduleGroupIndex(final int expected, final IRVertex vertex) {
+ assertEquals(expected, getScheduleGroupIndex(vertex));
+ }
+
+ /**
+ * @param vertex a vertex
+ * @return {@link ScheduleGroupIndexProperty} of {@code vertex}
+ */
+ private static int getScheduleGroupIndex(final IRVertex vertex) {
+ return vertex.getPropertyValue(ScheduleGroupIndexProperty.class)
+ .orElseThrow(() -> new RuntimeException(String.format("ScheduleGroup not set for %s", vertex.getId())));
+ }
+
+ /**
+ * Ensures that all vertices in {@code vertices} have different {@link ScheduleGroupIndexProperty} value.
+ * @param vertices vertices to test
+ */
+ private static void assertDifferentScheduleGroupIndex(final Collection<IRVertex> vertices) {
+ final Set<Integer> indices = new HashSet<>();
+ vertices.forEach(v -> {
+ final int idx = getScheduleGroupIndex(v);
+ assertFalse(indices.contains(idx));
+ indices.add(idx);
+ });
+ }
+
+ /**
+ * Test scenario when {@code allowMultipleInEdgesWithinScheduleGroup} is {@code true} and the DAG contains a branch.
+ */
+ @Test
+ public void testBranch() {
+ final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass();
+ final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
+ = generateBranchDAG(DataCommunicationPatternProperty.Value.OneToOne, DataFlowModelProperty.Value.Pull);
+ pass.apply(dag.left());
+ dag.right().forEach(v -> assertScheduleGroupIndex(0, v));
+ }
+
+ /**
+ * Test scenario when {@code allowMultipleInEdgesWithinScheduleGroup} is {@code false} and the DAG contains a branch.
+ */
+ @Test
+ public void testBranchWhenMultipleInEdgeNotAllowed() {
+ final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass(false, false, false);
+ final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
+ = generateBranchDAG(DataCommunicationPatternProperty.Value.OneToOne, DataFlowModelProperty.Value.Pull);
+ pass.apply(dag.left());
+ dag.right().subList(0, 4).forEach(v -> assertScheduleGroupIndex(0, v));
+ dag.right().subList(4, 5).forEach(v -> assertScheduleGroupIndex(1, v));
+ }
+
+ /**
+ * Test scenario to determine whether push edges properly enforces same scheduleGroupIndex or not.
+ */
+ @Test
+ public void testBranchWithPush() {
+ final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass(false, false, false);
+ final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
+ = generateBranchDAG(DataCommunicationPatternProperty.Value.Shuffle, DataFlowModelProperty.Value.Push);
+ pass.apply(dag.left());
+ dag.right().forEach(v -> assertScheduleGroupIndex(0, v));
+ }
+
+ /**
+ * Test scenario when {@code allowBroadcastWithinScheduleGroup} is {@code false} and DAG contains Broadcast edges.
+ */
+ @Test
+ public void testBranchWithBroadcast() {
+ final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass(false, true, true);
+ final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
+ = generateBranchDAG(DataCommunicationPatternProperty.Value.BroadCast, DataFlowModelProperty.Value.Pull);
+ assertDifferentScheduleGroupIndex(pass.apply(dag.left()).getVertices());
+ }
+
+ /**
+ * Test scenario when {@code allowShuffleWithinScheduleGroup} is {@code false} and DAG contains Shuffle edges.
+ */
+ @Test
+ public void testBranchWithShuffle() {
+ final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass(true, false, true);
+ final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
+ = generateBranchDAG(DataCommunicationPatternProperty.Value.Shuffle, DataFlowModelProperty.Value.Pull);
+ assertDifferentScheduleGroupIndex(pass.apply(dag.left()).getVertices());
+ }
+
+ /**
+ * Test scenario when {@code allowMultipleInEdgesWithinScheduleGroup} is {@code true} and the DAG contains a join.
+ */
+ @Test
+ public void testJoin() {
+ final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass();
+ final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
+ = generateJoinDAG(DataCommunicationPatternProperty.Value.OneToOne, DataFlowModelProperty.Value.Pull);
+ pass.apply(dag.left());
+ final int idxForFirstScheduleGroup = getScheduleGroupIndex(dag.right().get(0));
+ final int idxForSecondScheduleGroup = getScheduleGroupIndex(dag.right().get(2));
+ dag.right().subList(0, 2).forEach(v -> assertScheduleGroupIndex(idxForFirstScheduleGroup, v));
+ dag.right().subList(2, 4).forEach(v -> assertScheduleGroupIndex(idxForSecondScheduleGroup, v));
+ dag.right().subList(4, 6).forEach(v -> assertScheduleGroupIndex(2, v));
+ }
+
+ /**
+ * Test scenario with multiple push inEdges.
+ */
+ @Test
+ public void testJoinWithPush() {
+ final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass();
+ final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
+ = generateJoinDAG(DataCommunicationPatternProperty.Value.OneToOne, DataFlowModelProperty.Value.Push);
+ pass.apply(dag.left());
+ dag.right().forEach(v -> assertScheduleGroupIndex(0, v));
+ }
+
+ /**
+ * Test scenario when single push inEdges.
+ */
+ @Test
+ public void testJoinWithSinglePush() {
+ final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass();
+ final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
+ = generateJoinDAG(DataCommunicationPatternProperty.Value.OneToOne, DataFlowModelProperty.Value.Push);
+ dag.left().getOutgoingEdgesOf(dag.right().get(1)).iterator().next()
+ .getExecutionProperties().put(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
+ pass.apply(dag.left());
+ final int idxForFirstScheduleGroup = getScheduleGroupIndex(dag.right().get(0));
+ final int idxForSecondScheduleGroup = getScheduleGroupIndex(dag.right().get(2));
+ dag.right().subList(0, 2).forEach(v -> assertScheduleGroupIndex(idxForFirstScheduleGroup, v));
+ dag.right().subList(2, 6).forEach(v -> assertScheduleGroupIndex(idxForSecondScheduleGroup, v));
+ }
+}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/ScheduleGroupPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/ScheduleGroupPassTest.java
deleted file mode 100644
index 9e222fc..0000000
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/ScheduleGroupPassTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.tests.compiler.optimizer.pass.compiletime.annotating;
-
-import edu.snu.nemo.client.JobLauncher;
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
-import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
-import edu.snu.nemo.tests.compiler.optimizer.policy.TestPolicy;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.AnnotatingPass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ScheduleGroupPass;
-import edu.snu.nemo.tests.compiler.CompilerTestUtil;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test {@link ScheduleGroupPass}.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(JobLauncher.class)
-public final class ScheduleGroupPassTest {
- @Before
- public void setUp() throws Exception {
- }
-
- @Test
- public void testAnnotatingPass() {
- final AnnotatingPass scheduleGroupPass = new ScheduleGroupPass();
- assertEquals(ScheduleGroupIndexProperty.class, scheduleGroupPass.getExecutionPropertyToModify());
- }
-
- /**
- * This test ensures that a topologically sorted DAG has an increasing sequence of schedule group indexes.
- */
- @Test
- public void testScheduleGroupPass() throws Exception {
- final DAG<IRVertex, IREdge> compiledDAG = CompilerTestUtil.compileALSDAG();
- final DAG<IRVertex, IREdge> processedDAG = CompiletimeOptimizer.optimize(compiledDAG,
- new TestPolicy(), "");
-
- for (final IRVertex irVertex : processedDAG.getTopologicalSort()) {
- final Integer currentScheduleGroupIndex = irVertex.getPropertyValue(ScheduleGroupIndexProperty.class).get();
- final Integer largestScheduleGroupIndexOfParent = processedDAG.getParents(irVertex.getId()).stream()
- .mapToInt(v -> v.getPropertyValue(ScheduleGroupIndexProperty.class).get())
- .max().orElse(0);
- assertTrue(currentScheduleGroupIndex >= largestScheduleGroupIndexOfParent);
- }
- }
-}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/DisaggregationPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/DisaggregationPassTest.java
index 898f74e..d4cc616 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/DisaggregationPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/DisaggregationPassTest.java
@@ -19,13 +19,11 @@ import edu.snu.nemo.client.JobLauncher;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultParallelismPass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultStagePartitioningPass;
import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DisaggregationEdgeDataStorePass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ReviseInterStageEdgeDataStorePass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultInterTaskDataStorePass;
import edu.snu.nemo.tests.compiler.CompilerTestUtil;
import org.junit.Before;
import org.junit.Test;
@@ -52,22 +50,12 @@ public class DisaggregationPassTest {
public void testDisaggregation() throws Exception {
final DAG<IRVertex, IREdge> processedDAG =
new DisaggregationEdgeDataStorePass().apply(
- new ReviseInterStageEdgeDataStorePass().apply(
- new DefaultStagePartitioningPass().apply(
- new DefaultParallelismPass().apply(compiledDAG))));
+ new DefaultInterTaskDataStorePass().apply(
+ new DefaultParallelismPass().apply(compiledDAG)));
- processedDAG.getTopologicalSort().forEach(irVertex -> {
- processedDAG.getIncomingEdgesOf(irVertex).forEach(edgeToMerger -> {
- if (DataCommunicationPatternProperty.Value.OneToOne
- .equals(edgeToMerger.getPropertyValue(DataCommunicationPatternProperty.class).get())
- && edgeToMerger.getSrc().getPropertyValue(StageIdProperty.class).get()
- .equals(edgeToMerger.getDst().getPropertyValue(StageIdProperty.class).get())) {
- assertEquals(DataStoreProperty.Value.MemoryStore, edgeToMerger.getPropertyValue(DataStoreProperty.class).get());
- } else {
- assertEquals(DataStoreProperty.Value.GlusterFileStore,
- edgeToMerger.getPropertyValue(DataStoreProperty.class).get());
- }
- });
- });
+ processedDAG.getTopologicalSort().forEach(irVertex ->
+ processedDAG.getIncomingEdgesOf(irVertex).forEach(edgeToMerger ->
+ assertEquals(InterTaskDataStoreProperty.Value.GlusterFileStore,
+ edgeToMerger.getPropertyValue(InterTaskDataStoreProperty.class).get())));
}
}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/PadoCompositePassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/PadoCompositePassTest.java
index 73014e2..f983698 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/PadoCompositePassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/PadoCompositePassTest.java
@@ -19,7 +19,7 @@ import edu.snu.nemo.client.JobLauncher;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.PadoEdgeDataStorePass;
@@ -57,35 +57,35 @@ public class PadoCompositePassTest {
final IRVertex vertex5 = processedDAG.getTopologicalSort().get(1);
assertEquals(ExecutorPlacementProperty.TRANSIENT, vertex5.getPropertyValue(ExecutorPlacementProperty.class).get());
processedDAG.getIncomingEdgesOf(vertex5).forEach(irEdge -> {
- assertEquals(DataStoreProperty.Value.MemoryStore, irEdge.getPropertyValue(DataStoreProperty.class).get());
+ assertEquals(InterTaskDataStoreProperty.Value.MemoryStore, irEdge.getPropertyValue(InterTaskDataStoreProperty.class).get());
assertEquals(DataFlowModelProperty.Value.Pull, irEdge.getPropertyValue(DataFlowModelProperty.class).get());
});
final IRVertex vertex6 = processedDAG.getTopologicalSort().get(2);
assertEquals(ExecutorPlacementProperty.RESERVED, vertex6.getPropertyValue(ExecutorPlacementProperty.class).get());
processedDAG.getIncomingEdgesOf(vertex6).forEach(irEdge -> {
- assertEquals(DataStoreProperty.Value.LocalFileStore, irEdge.getPropertyValue(DataStoreProperty.class).get());
+ assertEquals(InterTaskDataStoreProperty.Value.LocalFileStore, irEdge.getPropertyValue(InterTaskDataStoreProperty.class).get());
assertEquals(DataFlowModelProperty.Value.Push, irEdge.getPropertyValue(DataFlowModelProperty.class).get());
});
final IRVertex vertex4 = processedDAG.getTopologicalSort().get(6);
assertEquals(ExecutorPlacementProperty.RESERVED, vertex4.getPropertyValue(ExecutorPlacementProperty.class).get());
processedDAG.getIncomingEdgesOf(vertex4).forEach(irEdge -> {
- assertEquals(DataStoreProperty.Value.MemoryStore, irEdge.getPropertyValue(DataStoreProperty.class).get());
+ assertEquals(InterTaskDataStoreProperty.Value.MemoryStore, irEdge.getPropertyValue(InterTaskDataStoreProperty.class).get());
assertEquals(DataFlowModelProperty.Value.Pull, irEdge.getPropertyValue(DataFlowModelProperty.class).get());
});
final IRVertex vertex12 = processedDAG.getTopologicalSort().get(10);
assertEquals(ExecutorPlacementProperty.TRANSIENT, vertex12.getPropertyValue(ExecutorPlacementProperty.class).get());
processedDAG.getIncomingEdgesOf(vertex12).forEach(irEdge -> {
- assertEquals(DataStoreProperty.Value.LocalFileStore, irEdge.getPropertyValue(DataStoreProperty.class).get());
+ assertEquals(InterTaskDataStoreProperty.Value.LocalFileStore, irEdge.getPropertyValue(InterTaskDataStoreProperty.class).get());
assertEquals(DataFlowModelProperty.Value.Pull, irEdge.getPropertyValue(DataFlowModelProperty.class).get());
});
final IRVertex vertex14 = processedDAG.getTopologicalSort().get(12);
assertEquals(ExecutorPlacementProperty.RESERVED, vertex14.getPropertyValue(ExecutorPlacementProperty.class).get());
processedDAG.getIncomingEdgesOf(vertex14).forEach(irEdge -> {
- assertEquals(DataStoreProperty.Value.LocalFileStore, irEdge.getPropertyValue(DataStoreProperty.class).get());
+ assertEquals(InterTaskDataStoreProperty.Value.LocalFileStore, irEdge.getPropertyValue(InterTaskDataStoreProperty.class).get());
assertEquals(DataFlowModelProperty.Value.Push, irEdge.getPropertyValue(DataFlowModelProperty.class).get());
});
}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
index e779f58..ab1beb7 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
@@ -61,8 +61,8 @@ public class SailfishPassTest {
edgeToMerger.getPropertyValue(DataFlowModelProperty.class).get());
assertEquals(UsedDataHandlingProperty.Value.Discard,
edgeToMerger.getPropertyValue(UsedDataHandlingProperty.class).get());
- assertEquals(DataStoreProperty.Value.SerializedMemoryStore,
- edgeToMerger.getPropertyValue(DataStoreProperty.class).get());
+ assertEquals(InterTaskDataStoreProperty.Value.SerializedMemoryStore,
+ edgeToMerger.getPropertyValue(InterTaskDataStoreProperty.class).get());
assertEquals(BytesDecoderFactory.of(),
edgeToMerger.getPropertyValue(DecoderProperty.class).get());
} else {
@@ -75,8 +75,8 @@ public class SailfishPassTest {
edgeFromMerger.getPropertyValue(DataFlowModelProperty.class).get());
assertEquals(DataCommunicationPatternProperty.Value.OneToOne,
edgeFromMerger.getPropertyValue(DataCommunicationPatternProperty.class).get());
- assertEquals(DataStoreProperty.Value.LocalFileStore,
- edgeFromMerger.getPropertyValue(DataStoreProperty.class).get());
+ assertEquals(InterTaskDataStoreProperty.Value.LocalFileStore,
+ edgeFromMerger.getPropertyValue(InterTaskDataStoreProperty.class).get());
assertEquals(BytesEncoderFactory.of(),
edgeFromMerger.getPropertyValue(EncoderProperty.class).get());
});
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
index 8d13374..b1ea8f8 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
@@ -16,8 +16,7 @@
package edu.snu.nemo.tests.compiler.optimizer.policy;
import edu.snu.nemo.common.exception.CompileTimeOptimizationException;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultStagePartitioningPass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ScheduleGroupPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultScheduleGroupPass;
import edu.snu.nemo.compiler.optimizer.pass.compiletime.composite.PadoCompositePass;
import edu.snu.nemo.compiler.optimizer.policy.*;
import org.junit.Test;
@@ -29,21 +28,21 @@ public final class PolicyBuilderTest {
@Test
public void testDisaggregationPolicy() {
final Policy disaggregationPolicy = new DisaggregationPolicy();
- assertEquals(15, disaggregationPolicy.getCompileTimePasses().size());
+ assertEquals(14, disaggregationPolicy.getCompileTimePasses().size());
assertEquals(0, disaggregationPolicy.getRuntimePasses().size());
}
@Test
public void testPadoPolicy() {
final Policy padoPolicy = new PadoPolicy();
- assertEquals(17, padoPolicy.getCompileTimePasses().size());
+ assertEquals(16, padoPolicy.getCompileTimePasses().size());
assertEquals(0, padoPolicy.getRuntimePasses().size());
}
@Test
public void testDataSkewPolicy() {
final Policy dataSkewPolicy = new DataSkewPolicy();
- assertEquals(19, dataSkewPolicy.getCompileTimePasses().size());
+ assertEquals(18, dataSkewPolicy.getCompileTimePasses().size());
assertEquals(1, dataSkewPolicy.getRuntimePasses().size());
}
@@ -52,8 +51,7 @@ public final class PolicyBuilderTest {
try {
final Policy failPolicy = new PolicyBuilder()
.registerCompileTimePass(new PadoCompositePass())
- .registerCompileTimePass(new DefaultStagePartitioningPass())
- .registerCompileTimePass(new ScheduleGroupPass())
+ .registerCompileTimePass(new DefaultScheduleGroupPass())
.build();
} catch (Exception e) { // throw an exception if default execution properties are not set.
assertTrue(e instanceof CompileTimeOptimizationException);
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/TestPolicy.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/TestPolicy.java
index 51ca1ae..59dbd4e 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/TestPolicy.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/TestPolicy.java
@@ -39,13 +39,12 @@ public final class TestPolicy implements Policy {
@Override
public List<CompileTimePass> getCompileTimePasses() {
List<CompileTimePass> policy = new ArrayList<>();
- policy.add(new DefaultStagePartitioningPass());
if (testPushPolicy) {
policy.add(new ShuffleEdgePushPass());
}
- policy.add(new ScheduleGroupPass());
+ policy.add(new DefaultScheduleGroupPass());
return policy;
}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
index 671fdb0..5175045 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
@@ -20,7 +20,7 @@ import edu.snu.nemo.common.dag.DAGBuilder;
import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.InterTaskDataStoreProperty;
import edu.snu.nemo.common.ir.vertex.SourceVertex;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.vertex.OperatorVertex;
@@ -154,35 +154,35 @@ public final class DAGConverterTest {
// irDAGBuilder.addVertex(v7);
final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v1, v2);
- e1.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+ e1.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.MemoryStore));
e1.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v1, v3);
- e2.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+ e2.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.MemoryStore));
e2.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
final IREdge e3 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4);
- e3.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+ e3.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.MemoryStore));
e3.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Push));
final IREdge e4 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v5);
- e4.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+ e4.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.MemoryStore));
e4.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Push));
final IREdge e5 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v6);
- e5.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+ e5.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore));
e5.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
final IREdge e6 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v8);
- e6.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+ e6.setProperty(InterTaskDataStoreProperty.of(InterTaskDataStoreProperty.Value.LocalFileStore));
e6.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
// final IREdge e7 = new IREdge(OneToOne, v7, v5);
-// e7.setProperty(DataStoreProperty.of(MemoryStore));
+// e7.setProperty(InterTaskDataStoreProperty.of(MemoryStore));
// e7.setProperty(Attribute.Key.PullOrPush, DataFlowModelProperty.Value.Push));
//
// final IREdge e8 = new IREdge(OneToOne, v5, v8);
-// e8.setProperty(DataStoreProperty.of(MemoryStore));
+// e8.setProperty(InterTaskDataStoreProperty.of(MemoryStore));
// e8.setProperty(Attribute.Key.PullOrPush, DataFlowModelProperty.Value.Pull));
// Stage 1 = {v1, v2, v3}