You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/06/15 00:19:33 UTC
[incubator-nemo] branch master updated: [NEMO-101] Make Coder as an
execution property (#32)
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 7ac7478 [NEMO-101] Make Coder as an execution property (#32)
7ac7478 is described below
commit 7ac7478098293df93a304dfb5d556b3863eecd5f
Author: Sanha Lee <sa...@gmail.com>
AuthorDate: Fri Jun 15 09:19:27 2018 +0900
[NEMO-101] Make Coder as an execution property (#32)
JIRA: [NEMO-101: Make Coder as an execution property](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-101)
**Major changes:**
- Make `Coder` as an execution property and let optimization pass to configure it.
**Minor changes to note:**
- Implement `DefaultEdgeCoderPass` which assign `Coder.DUMMY_CODER` as a default and include it in `PrimitiveCompositePass`.
**Tests for the changes:**
- `DefaultEdgeCoderPassTest` is added for `DefaultEdgeCoderPass`.
- Existing other unit tests and integration tests also cover the change.
**Other comments:**
- N/A
resolves [NEMO-101: Make Coder as an execution property](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-101)
---
bin/json2dot.py | 44 +++---------
.../main/java/edu/snu/nemo/common/coder/Coder.java | 5 +-
.../java/edu/snu/nemo/common/ir/edge/IREdge.java | 39 ++++-------
...eyExtractorProperty.java => CoderProperty.java} | 16 +++--
.../executionproperty/KeyExtractorProperty.java | 2 +
.../ir/executionproperty/ExecutionProperty.java | 20 ++++++
.../edu/snu/nemo/common/ir/vertex/LoopVertex.java | 8 +--
.../frontend/beam/NemoPipelineVisitor.java | 9 +--
.../frontend/spark/core/SparkFrontendUtils.java | 8 ++-
.../frontend/spark/core/rdd/PairRDDFunctions.scala | 7 +-
.../compiler/frontend/spark/core/rdd/RDD.scala | 28 +++++---
.../compiler/optimizer/CompiletimeOptimizer.java | 2 +-
.../MapReduceDisaggregationOptimization.java | 7 +-
.../annotating/DefaultEdgeCoderPass.java | 51 ++++++++++++++
.../composite/PrimitiveCompositePass.java | 1 +
.../CommonSubexpressionEliminationPass.java | 4 +-
.../reshaping/DataSkewReshapingPass.java | 6 +-
.../compiletime/reshaping/LoopExtractionPass.java | 16 ++---
.../compiletime/reshaping/LoopOptimizations.java | 11 +--
.../reshaping/SailfishRelayReshapingPass.java | 8 ++-
.../runtime/common/plan/PhysicalPlanGenerator.java | 4 +-
.../snu/nemo/runtime/common/plan/RuntimeEdge.java | 59 +++++++---------
.../snu/nemo/runtime/common/plan/StageEdge.java | 34 +++++-----
.../nemo/runtime/common/plan/StageEdgeBuilder.java | 27 ++++----
.../edu/snu/nemo/runtime/executor/Executor.java | 13 ++--
.../runtime/executor/task/TaskExecutorTest.java | 3 +-
.../runtime/plangenerator/TestPlanGenerator.java | 13 ++--
.../snu/nemo/tests/common/ir/LoopVertexTest.java | 27 +++-----
.../ExecutionPropertyMapTest.java | 6 +-
.../compiler/backend/nemo/NemoBackendTest.java | 11 ++-
.../annotating/DefaultEdgeCoderPassTest.java | 79 ++++++++++++++++++++++
.../annotating/DefaultParallelismPassTest.java | 2 +-
.../CommonSubexpressionEliminationPassTest.java | 17 +++--
.../compiletime/reshaping/LoopFusionPassTest.java | 10 ++-
.../reshaping/LoopInvariantCodeMotionPassTest.java | 11 +--
.../optimizer/policy/PolicyBuilderTest.java | 6 +-
.../runtime/common/plan/DAGConverterTest.java | 14 ++--
.../executor/datatransfer/DataTransferTest.java | 40 ++++++++---
38 files changed, 404 insertions(+), 264 deletions(-)
diff --git a/bin/json2dot.py b/bin/json2dot.py
index 85538e7..8a841f8 100755
--- a/bin/json2dot.py
+++ b/bin/json2dot.py
@@ -27,7 +27,8 @@ import re
nextIdx = 0
def edgePropertiesString(properties):
- return '/'.join(['SideInput' if x[0] == 'IsSideInput' else x[1].split('.')[-1] for x in sorted(properties.items())])
+ prop = {p[0]: p[1] for p in properties.items() if p[0] != 'Coder'}
+ return '/'.join(['SideInput' if x[0] == 'IsSideInput' else x[1].split('.')[-1] for x in sorted(prop.items())])
def getIdx():
global nextIdx
@@ -280,10 +281,6 @@ def Edge(src, dst, properties):
except:
pass
try:
- return StageEdge(src, dst, properties)
- except:
- pass
- try:
return RuntimeEdge(src, dst, properties)
except:
pass
@@ -308,7 +305,7 @@ class IREdge:
self.dst = dst
self.id = properties['id']
self.executionProperties = properties['executionProperties']
- self.coder = properties['coder']
+ self.coder = self.executionProperties['Coder']
@property
def dot(self):
src = self.src
@@ -327,37 +324,14 @@ class IREdge:
class StageEdge:
def __init__(self, src, dst, properties):
- self.src = src
- self.dst = dst
- self.runtimeEdgeId = properties['runtimeEdgeId']
- self.edgeProperties = properties['edgeProperties']
- self.externalVertexAttr = properties['externalVertexAttr']
- self.parallelism = self.externalVertexAttr['Parallelism']
- self.coder = properties['coder']
- @property
- def dot(self):
- color = 'black'
- try:
- if self.externalVertexAttr['ContainerType'] == 'Transient':
- color = 'orange'
- if self.externalVertexAttr['ContainerType'] == 'Reserved':
- color = 'green'
- except:
- pass
- label = '{} (p{})<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(self.runtimeEdgeId, self.parallelism, edgePropertiesString(self.edgeProperties), self.coder)
- return '{} -> {} [ltail = {}, lhead = {}, label = <{}>, color = {}];'.format(self.src.oneVertex.idx,
- self.dst.oneVertex.idx, self.src.logicalEnd, self.dst.logicalEnd, label, color)
-
-class StageEdge:
- def __init__(self, src, dst, properties):
self.src = src.internalDAG.vertices[properties['srcVertex']]
self.dst = dst.internalDAG.vertices[properties['dstVertex']]
self.runtimeEdgeId = properties['runtimeEdgeId']
- self.edgeProperties = properties['edgeProperties']
- self.coder = properties['coder']
+ self.executionProperties = properties['executionProperties']
+ self.coder = self.executionProperties['Coder']
@property
def dot(self):
- label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(self.runtimeEdgeId, edgePropertiesString(self.edgeProperties), self.coder)
+ label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(self.runtimeEdgeId, edgePropertiesString(self.executionProperties), self.coder)
return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(self.src.oneVertex.idx,
self.dst.oneVertex.idx, self.src.logicalEnd, self.dst.logicalEnd, label)
@@ -366,11 +340,11 @@ class RuntimeEdge:
self.src = src
self.dst = dst
self.runtimeEdgeId = properties['runtimeEdgeId']
- self.edgeProperties = properties['edgeProperties']
- self.coder = properties['coder']
+ self.executionProperties = properties['executionProperties']
+ self.coder = self.executionProperties['Coder']
@property
def dot(self):
- label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(self.runtimeEdgeId, edgePropertiesString(self.edgeProperties), self.coder)
+ label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(self.runtimeEdgeId, edgePropertiesString(self.executionProperties), self.coder)
return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(self.src.oneVertex.idx,
self.dst.oneVertex.idx, self.src.logicalEnd, self.dst.logicalEnd, label)
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/Coder.java b/common/src/main/java/edu/snu/nemo/common/coder/Coder.java
index c0d9eb4..4507143 100644
--- a/common/src/main/java/edu/snu/nemo/common/coder/Coder.java
+++ b/common/src/main/java/edu/snu/nemo/common/coder/Coder.java
@@ -21,7 +21,8 @@ import java.io.OutputStream;
import java.io.Serializable;
/**
- * A {@link Coder Coder<T>} object encodes or decodes values of type {@code T} into byte streams.
+ * A coder object encodes or decodes values of type {@code T} into byte streams.
+ *
* @param <T> element type.
*/
public interface Coder<T> extends Serializable {
@@ -31,7 +32,7 @@ public interface Coder<T> extends Serializable {
* Because the user can want to keep a single output stream and continuously concatenate elements,
* the output stream should not be closed.
*
- * @param element the element to be encoded
+ * @param element the element to be encoded
* @param outStream the stream on which encoded bytes are written
* @throws IOException if fail to encode
*/
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/IREdge.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/IREdge.java
index 9dbaabb..cfb6497 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/IREdge.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/IREdge.java
@@ -15,7 +15,6 @@
*/
package edu.snu.nemo.common.ir.edge;
-import edu.snu.nemo.common.coder.Coder;
import edu.snu.nemo.common.dag.Edge;
import edu.snu.nemo.common.ir.IdManager;
import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
@@ -30,44 +29,42 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
*/
public final class IREdge extends Edge<IRVertex> {
private final ExecutionPropertyMap executionProperties;
- private final Coder coder;
private final Boolean isSideInput;
/**
* Constructor of IREdge.
+ * This constructor assumes that this edge is not for a side input.
+ *
* @param commPattern data communication pattern type of the edge.
- * @param src source vertex.
- * @param dst destination vertex.
- * @param coder coder.
+ * @param src source vertex.
+ * @param dst destination vertex.
*/
public IREdge(final DataCommunicationPatternProperty.Value commPattern,
final IRVertex src,
- final IRVertex dst,
- final Coder coder) {
- this(commPattern, src, dst, coder, false);
+ final IRVertex dst) {
+ this(commPattern, src, dst, false);
}
/**
* Constructor of IREdge.
+ *
* @param commPattern data communication pattern type of the edge.
- * @param src source vertex.
- * @param dst destination vertex.
- * @param coder coder.
+ * @param src source vertex.
+ * @param dst destination vertex.
* @param isSideInput flag for whether or not the edge is a sideInput.
*/
public IREdge(final DataCommunicationPatternProperty.Value commPattern,
final IRVertex src,
final IRVertex dst,
- final Coder coder,
final Boolean isSideInput) {
super(IdManager.newEdgeId(), src, dst);
- this.coder = coder;
this.isSideInput = isSideInput;
this.executionProperties = ExecutionPropertyMap.of(this, commPattern);
}
/**
* Set an executionProperty of the IREdge.
+ *
* @param executionProperty the execution property.
* @return the IREdge with the execution property set.
*/
@@ -78,7 +75,8 @@ public final class IREdge extends Edge<IRVertex> {
/**
* Get the executionProperty of the IREdge.
- * @param <T> Type of the return value.
+ *
+ * @param <T> Type of the return value.
* @param executionPropertyKey key of the execution property.
* @return the execution property.
*/
@@ -94,13 +92,6 @@ public final class IREdge extends Edge<IRVertex> {
}
/**
- * @return coder for the edge.
- */
- public Coder getCoder() {
- return coder;
- }
-
- /**
* @return whether or not the edge is a side input edge.
*/
public Boolean isSideInput() {
@@ -117,6 +108,7 @@ public final class IREdge extends Edge<IRVertex> {
/**
* Static function to copy executionProperties from an edge to the other.
+ *
* @param thatEdge the edge to copy executionProperties to.
*/
public void copyExecutionPropertiesTo(final IREdge thatEdge) {
@@ -132,7 +124,7 @@ public final class IREdge extends Edge<IRVertex> {
return false;
}
- IREdge irEdge = (IREdge) o;
+ final IREdge irEdge = (IREdge) o;
return executionProperties.equals(irEdge.getExecutionProperties()) && hasSameItineraryAs(irEdge);
}
@@ -151,8 +143,7 @@ public final class IREdge extends Edge<IRVertex> {
final StringBuilder sb = new StringBuilder();
sb.append("{\"id\": \"").append(getId());
sb.append("\", \"executionProperties\": ").append(executionProperties);
- sb.append(", \"coder\": \"").append(coder.toString());
- sb.append("\"}");
+ sb.append("}");
return sb.toString();
}
}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/KeyExtractorProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java
similarity index 73%
copy from common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/KeyExtractorProperty.java
copy to common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java
index eaf73a6..1d2e5dc 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/KeyExtractorProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java
@@ -15,27 +15,29 @@
*/
package edu.snu.nemo.common.ir.edge.executionproperty;
-import edu.snu.nemo.common.KeyExtractor;
+import edu.snu.nemo.common.coder.Coder;
import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
/**
- * KeyExtractor ExecutionProperty.
+ * Coder ExecutionProperty.
*/
-public final class KeyExtractorProperty extends ExecutionProperty<KeyExtractor> {
+public final class CoderProperty extends ExecutionProperty<Coder> {
/**
* Constructor.
+ *
* @param value value of the execution property.
*/
- private KeyExtractorProperty(final KeyExtractor value) {
- super(Key.KeyExtractor, value);
+ private CoderProperty(final Coder value) {
+ super(Key.Coder, value);
}
/**
* Static method exposing the constructor.
+ *
* @param value value of the new execution property.
* @return the newly created execution property.
*/
- public static KeyExtractorProperty of(final KeyExtractor value) {
- return new KeyExtractorProperty(value);
+ public static CoderProperty of(final Coder value) {
+ return new CoderProperty(value);
}
}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/KeyExtractorProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/KeyExtractorProperty.java
index eaf73a6..0387e07 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/KeyExtractorProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/KeyExtractorProperty.java
@@ -24,6 +24,7 @@ import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
public final class KeyExtractorProperty extends ExecutionProperty<KeyExtractor> {
/**
* Constructor.
+ *
* @param value value of the execution property.
*/
private KeyExtractorProperty(final KeyExtractor value) {
@@ -32,6 +33,7 @@ public final class KeyExtractorProperty extends ExecutionProperty<KeyExtractor>
/**
* Static method exposing the constructor.
+ *
* @param value value of the new execution property.
* @return the newly created execution property.
*/
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionProperty.java
index ef55daa..3bb4a4f 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionProperty.java
@@ -16,6 +16,7 @@
package edu.snu.nemo.common.ir.executionproperty;
import java.io.Serializable;
+import java.util.Objects;
/**
* An abstract class for each execution factors.
@@ -59,6 +60,24 @@ public abstract class ExecutionProperty<T> implements Serializable {
};
}
+ @Override
+ public final boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final ExecutionProperty<?> that = (ExecutionProperty<?>) o;
+ return getKey() == that.getKey()
+ && Objects.equals(getValue(), that.getValue());
+ }
+
+ @Override
+ public final int hashCode() {
+ return Objects.hash(getKey(), getValue());
+ }
+
/**
* Key for different types of execution property.
*/
@@ -73,6 +92,7 @@ public abstract class ExecutionProperty<T> implements Serializable {
UsedDataHandling,
Compression,
DuplicateEdgeGroup,
+ Coder,
// Applies to IRVertex
DynamicOptimizationType,
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/LoopVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/LoopVertex.java
index 75f92cd..78d153a 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/LoopVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/LoopVertex.java
@@ -218,7 +218,7 @@ public final class LoopVertex extends IRVertex {
dagToAdd.getIncomingEdgesOf(irVertex).forEach(edge -> {
final IRVertex newSrc = originalToNewIRVertex.get(edge.getSrc());
final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
- newSrc, newIrVertex, edge.getCoder(), edge.isSideInput());
+ newSrc, newIrVertex, edge.isSideInput());
edge.copyExecutionPropertiesTo(newIrEdge);
dagBuilder.connectVertices(newIrEdge);
});
@@ -227,7 +227,7 @@ public final class LoopVertex extends IRVertex {
// process DAG incoming edges.
getDagIncomingEdges().forEach((dstVertex, irEdges) -> irEdges.forEach(edge -> {
final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
- edge.getSrc(), originalToNewIRVertex.get(dstVertex), edge.getCoder(), edge.isSideInput());
+ edge.getSrc(), originalToNewIRVertex.get(dstVertex), edge.isSideInput());
edge.copyExecutionPropertiesTo(newIrEdge);
dagBuilder.connectVertices(newIrEdge);
}));
@@ -236,7 +236,7 @@ public final class LoopVertex extends IRVertex {
// if termination condition met, we process the DAG outgoing edge.
getDagOutgoingEdges().forEach((srcVertex, irEdges) -> irEdges.forEach(edge -> {
final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
- originalToNewIRVertex.get(srcVertex), edge.getDst(), edge.getCoder(), edge.isSideInput());
+ originalToNewIRVertex.get(srcVertex), edge.getDst(), edge.isSideInput());
edge.copyExecutionPropertiesTo(newIrEdge);
dagBuilder.addVertex(edge.getDst()).connectVertices(newIrEdge);
}));
@@ -247,7 +247,7 @@ public final class LoopVertex extends IRVertex {
this.nonIterativeIncomingEdges.forEach((dstVertex, irEdges) -> irEdges.forEach(this::addDagIncomingEdge));
this.iterativeIncomingEdges.forEach((dstVertex, irEdges) -> irEdges.forEach(edge -> {
final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
- originalToNewIRVertex.get(edge.getSrc()), dstVertex, edge.getCoder(), edge.isSideInput());
+ originalToNewIRVertex.get(edge.getSrc()), dstVertex, edge.isSideInput());
edge.copyExecutionPropertiesTo(newIrEdge);
this.addDagIncomingEdge(newIrEdge);
}));
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
index f61adc6..3a7f9bd 100644
--- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
+++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
@@ -15,6 +15,7 @@
*/
package edu.snu.nemo.compiler.frontend.beam;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
import edu.snu.nemo.common.ir.vertex.transform.Transform;
import edu.snu.nemo.compiler.frontend.beam.coder.BeamCoder;
import edu.snu.nemo.common.dag.DAGBuilder;
@@ -105,8 +106,8 @@ public final class NemoPipelineVisitor extends Pipeline.PipelineVisitor.Defaults
beamNode.getInputs().values().stream().filter(pValueToVertex::containsKey)
.forEach(pValue -> {
final IRVertex src = pValueToVertex.get(pValue);
- final BeamCoder coder = pValueToCoder.get(pValue);
- final IREdge edge = new IREdge(getEdgeCommunicationPattern(src, irVertex), src, irVertex, coder);
+ final IREdge edge = new IREdge(getEdgeCommunicationPattern(src, irVertex), src, irVertex);
+ edge.setProperty(CoderProperty.of(pValueToCoder.get(pValue)));
edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor()));
this.builder.connectVertices(edge);
});
@@ -200,9 +201,9 @@ public final class NemoPipelineVisitor extends Pipeline.PipelineVisitor.Defaults
sideInputs.stream().filter(pValueToVertex::containsKey)
.forEach(pValue -> {
final IRVertex src = pValueToVertex.get(pValue);
- final BeamCoder coder = pValueToCoder.get(pValue);
final IREdge edge = new IREdge(getEdgeCommunicationPattern(src, irVertex),
- src, irVertex, coder, true);
+ src, irVertex, true);
+ edge.setProperty(CoderProperty.of(pValueToCoder.get(pValue)));
edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor()));
builder.connectVertices(edge);
});
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
index 1cf535a..8ad00d3 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
@@ -19,6 +19,7 @@ import edu.snu.nemo.client.JobLauncher;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.dag.DAGBuilder;
import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
import edu.snu.nemo.common.ir.edge.executionproperty.KeyExtractorProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
@@ -53,6 +54,8 @@ import java.util.Stack;
* Utility class for RDDs.
*/
public final class SparkFrontendUtils {
+ private static final KeyExtractorProperty SPARK_KEY_EXTRACTOR_PROP = KeyExtractorProperty.of(new SparkKeyExtractor());
+
/**
* Private constructor.
*/
@@ -96,8 +99,9 @@ public final class SparkFrontendUtils {
builder.addVertex(collectVertex, loopVertexStack);
final IREdge newEdge = new IREdge(getEdgeCommunicationPattern(lastVertex, collectVertex),
- lastVertex, collectVertex, new SparkCoder(serializer));
- newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor()));
+ lastVertex, collectVertex);
+ newEdge.setProperty(CoderProperty.of(new SparkCoder(serializer)));
+ newEdge.setProperty(SPARK_KEY_EXTRACTOR_PROP);
builder.connectVertices(newEdge);
// launch DAG
diff --git a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala
index bb1c496..0b61d7b 100644
--- a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala
+++ b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala
@@ -19,7 +19,8 @@ import java.util
import edu.snu.nemo.common.dag.DAGBuilder
import edu.snu.nemo.common.ir.edge.IREdge
-import edu.snu.nemo.common.ir.edge.executionproperty.KeyExtractorProperty
+import edu.snu.nemo.common.ir.edge.executionproperty.{CoderProperty, KeyExtractorProperty}
+import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty
import edu.snu.nemo.common.ir.vertex.{IRVertex, LoopVertex, OperatorVertex}
import edu.snu.nemo.compiler.frontend.spark.SparkKeyExtractor
import edu.snu.nemo.compiler.frontend.spark.coder.SparkCoder
@@ -69,7 +70,9 @@ final class PairRDDFunctions[K: ClassTag, V: ClassTag] protected[rdd] (
builder.addVertex(reduceByKeyVertex, loopVertexStack)
val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(self.lastVertex, reduceByKeyVertex),
- self.lastVertex, reduceByKeyVertex, new SparkCoder[Tuple2[K, V]](self.serializer))
+ self.lastVertex, reduceByKeyVertex)
+ newEdge.setProperty(
+ CoderProperty.of(new SparkCoder[Tuple2[K, V]](self.serializer)).asInstanceOf[ExecutionProperty[_]])
newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor))
builder.connectVertices(newEdge)
diff --git a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala
index cfa144d..12318c5 100644
--- a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala
+++ b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala
@@ -20,7 +20,8 @@ import java.util
import edu.snu.nemo.client.JobLauncher
import edu.snu.nemo.common.dag.{DAG, DAGBuilder}
import edu.snu.nemo.common.ir.edge.IREdge
-import edu.snu.nemo.common.ir.edge.executionproperty.KeyExtractorProperty
+import edu.snu.nemo.common.ir.edge.executionproperty.{CoderProperty, KeyExtractorProperty}
+import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty
import edu.snu.nemo.common.ir.vertex.{IRVertex, LoopVertex, OperatorVertex}
import edu.snu.nemo.compiler.frontend.spark.SparkKeyExtractor
import edu.snu.nemo.compiler.frontend.spark.coder.SparkCoder
@@ -48,8 +49,11 @@ final class RDD[T: ClassTag] protected[rdd] (
protected[rdd] val lastVertex: IRVertex,
private val sourceRDD: Option[org.apache.spark.rdd.RDD[T]]) extends org.apache.spark.rdd.RDD[T](_sc, deps) {
- private val loopVertexStack = new util.Stack[LoopVertex]
protected[rdd] val serializer: Serializer = SparkFrontendUtils.deriveSerializerFrom(_sc)
+ private val loopVertexStack = new util.Stack[LoopVertex]
+ private val coderProperty: ExecutionProperty[_] =
+ CoderProperty.of(new SparkCoder[T](serializer)).asInstanceOf[ExecutionProperty[_]]
+ private val keyExtractorProperty: KeyExtractorProperty = KeyExtractorProperty.of(new SparkKeyExtractor)
/**
* Constructor without dependencies (not needed in Nemo RDD).
@@ -132,8 +136,9 @@ final class RDD[T: ClassTag] protected[rdd] (
builder.addVertex(mapVertex, loopVertexStack)
val newEdge: IREdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, mapVertex),
- lastVertex, mapVertex, new SparkCoder[T](serializer))
- newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor))
+ lastVertex, mapVertex)
+ newEdge.setProperty(coderProperty)
+ newEdge.setProperty(keyExtractorProperty)
builder.connectVertices(newEdge)
new RDD[U](_sc, builder.buildWithoutSourceSinkCheck, mapVertex, Option.empty)
@@ -150,8 +155,9 @@ final class RDD[T: ClassTag] protected[rdd] (
builder.addVertex(flatMapVertex, loopVertexStack)
val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, flatMapVertex),
- lastVertex, flatMapVertex, new SparkCoder[T](serializer))
- newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor))
+ lastVertex, flatMapVertex)
+ newEdge.setProperty(coderProperty)
+ newEdge.setProperty(keyExtractorProperty)
builder.connectVertices(newEdge)
new RDD[U](_sc, builder.buildWithoutSourceSinkCheck, flatMapVertex, Option.empty)
@@ -179,8 +185,9 @@ final class RDD[T: ClassTag] protected[rdd] (
builder.addVertex(reduceVertex, loopVertexStack)
val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, reduceVertex),
- lastVertex, reduceVertex, new SparkCoder[T](serializer))
- newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor))
+ lastVertex, reduceVertex)
+ newEdge.setProperty(coderProperty)
+ newEdge.setProperty(keyExtractorProperty)
builder.connectVertices(newEdge)
ReduceTransform.reduceIterator(
@@ -202,8 +209,9 @@ final class RDD[T: ClassTag] protected[rdd] (
builder.addVertex(flatMapVertex, loopVertexStack)
val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, flatMapVertex),
- lastVertex, flatMapVertex, new SparkCoder[T](serializer))
- newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor))
+ lastVertex, flatMapVertex)
+ newEdge.setProperty(coderProperty)
+ newEdge.setProperty(keyExtractorProperty)
builder.connectVertices(newEdge)
JobLauncher.launchDAG(builder.build)
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/CompiletimeOptimizer.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/CompiletimeOptimizer.java
index cadb443..0213242 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/CompiletimeOptimizer.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/CompiletimeOptimizer.java
@@ -71,7 +71,7 @@ public final class CompiletimeOptimizer {
if ((passToApply instanceof AnnotatingPass && !checkAnnotatingPass(dag, processedDAG))
|| (passToApply instanceof ReshapingPass && !checkReshapingPass(dag, processedDAG))) {
throw new CompileTimeOptimizationException(passToApply.getClass().getSimpleName()
- + "is implemented in a way that doesn't follow its original intention of annotating or reshaping. "
+ + " is implemented in a way that doesn't follow its original intention of annotating or reshaping. "
+ "Modify it or use a general CompileTimePass");
}
// Save the processed JSON DAG.
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/MapReduceDisaggregationOptimization.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/MapReduceDisaggregationOptimization.java
index 6695ec1..8225df2 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/MapReduceDisaggregationOptimization.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/MapReduceDisaggregationOptimization.java
@@ -15,7 +15,6 @@
*/
package edu.snu.nemo.compiler.optimizer.examples;
-import edu.snu.nemo.common.coder.Coder;
import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.dag.DAGBuilder;
@@ -58,12 +57,10 @@ public final class MapReduceDisaggregationOptimization {
builder.addVertex(map);
builder.addVertex(reduce);
- final IREdge edge1 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
- source, map, Coder.DUMMY_CODER);
+ final IREdge edge1 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, map);
builder.connectVertices(edge1);
- final IREdge edge2 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle,
- map, reduce, Coder.DUMMY_CODER);
+ final IREdge edge2 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, map, reduce);
builder.connectVertices(edge2);
final DAG<IRVertex, IREdge> dag = builder.build();
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPass.java
new file mode 100644
index 0000000..d4efdda
--- /dev/null
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPass.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright (C) 2017 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
+import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+
+import java.util.Collections;
+
+/**
+ * Pass for initiating IREdge Coder ExecutionProperty with default dummy coder.
+ */
+public final class DefaultEdgeCoderPass extends AnnotatingPass {
+
+ private static final CoderProperty DEFAULT_CODER_PROPERTY = CoderProperty.of(Coder.DUMMY_CODER);
+
+ /**
+ * Default constructor.
+ */
+ public DefaultEdgeCoderPass() {
+ super(ExecutionProperty.Key.Coder, Collections.emptySet());
+ }
+
+ @Override
+ public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+ dag.topologicalDo(irVertex ->
+ dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
+ if (irEdge.getProperty(ExecutionProperty.Key.Coder) == null) {
+ irEdge.setProperty(DEFAULT_CODER_PROPERTY);
+ }
+ }));
+ return dag;
+ }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
index d0de39d..5a5949c 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
@@ -31,6 +31,7 @@ public final class PrimitiveCompositePass extends CompositePass {
public PrimitiveCompositePass() {
super(Arrays.asList(
new DefaultParallelismPass(), // annotating after reshaping passes, before stage partitioning
+ new DefaultEdgeCoderPass(),
new DefaultStagePartitioningPass(),
new ReviseInterStageEdgeDataStorePass(), // after stage partitioning
new DefaultEdgeUsedDataHandlingPass(),
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java
index 18757ae..117e1af 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java
@@ -16,6 +16,7 @@
package edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping;
import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.vertex.OperatorVertex;
import edu.snu.nemo.common.ir.vertex.transform.Transform;
@@ -147,7 +148,8 @@ public final class CommonSubexpressionEliminationPass extends ReshapingPass {
outEdges.getOrDefault(ov, new HashSet<>()).forEach(e -> {
outListToModify.remove(e);
final IREdge newIrEdge = new IREdge(e.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
- operatorVertexToUse, e.getDst(), e.getCoder());
+ operatorVertexToUse, e.getDst());
+ newIrEdge.setProperty(CoderProperty.of(e.getProperty(ExecutionProperty.Key.Coder)));
outListToModify.add(newIrEdge);
});
outEdges.remove(ov);
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java
index 32f8b5d..88e16d6 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java
@@ -18,6 +18,7 @@ package edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.dag.DAGBuilder;
import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
@@ -65,10 +66,11 @@ public final class DataSkewReshapingPass extends ReshapingPass {
.equals(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern))) {
// We then insert the dynamicOptimizationVertex between the vertex and incoming vertices.
final IREdge newEdge = new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
- edge.getSrc(), metricCollectionBarrierVertex, edge.getCoder());
+ edge.getSrc(), metricCollectionBarrierVertex);
+ newEdge.setProperty(CoderProperty.of(edge.getProperty(ExecutionProperty.Key.Coder)));
final IREdge edgeToGbK = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
- metricCollectionBarrierVertex, v, edge.getCoder(), edge.isSideInput());
+ metricCollectionBarrierVertex, v, edge.isSideInput());
edge.copyExecutionPropertiesTo(edgeToGbK);
builder.connectVertices(newEdge);
builder.connectVertices(edgeToGbK);
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java
index 02ede77..b913c77 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java
@@ -61,7 +61,7 @@ public final class LoopExtractionPass extends ReshapingPass {
/**
* This part groups each iteration of loops together by observing the LoopVertex assigned to primitive operators,
- * which is assigned by the {@link edu.snu.nemo.client.beam.NemoPipelineVisitor}. This also shows in which depth of
+ * which is assigned by the NemoPipelineVisitor. This also shows in which depth of
* nested loops the function handles. It recursively calls itself from the maximum depth until 0.
* @param dag DAG to process
* @param depth the depth of the stack to process. Must be greater than 0.
@@ -100,7 +100,7 @@ public final class LoopExtractionPass extends ReshapingPass {
srcLoopVertex.addDagOutgoingEdge(irEdge);
final IREdge edgeFromLoop =
new IREdge(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
- srcLoopVertex, operatorVertex, irEdge.getCoder(), irEdge.isSideInput());
+ srcLoopVertex, operatorVertex, irEdge.isSideInput());
irEdge.copyExecutionPropertiesTo(edgeFromLoop);
builder.connectVertices(edgeFromLoop);
srcLoopVertex.mapEdgeWithLoop(edgeFromLoop, irEdge);
@@ -148,7 +148,7 @@ public final class LoopExtractionPass extends ReshapingPass {
} else { // loop -> loop connection
assignedLoopVertex.addDagIncomingEdge(irEdge);
final IREdge edgeToLoop = new IREdge(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
- srcLoopVertex, assignedLoopVertex, irEdge.getCoder(), irEdge.isSideInput());
+ srcLoopVertex, assignedLoopVertex, irEdge.isSideInput());
irEdge.copyExecutionPropertiesTo(edgeToLoop);
builder.connectVertices(edgeToLoop);
assignedLoopVertex.mapEdgeWithLoop(edgeToLoop, irEdge);
@@ -156,7 +156,7 @@ public final class LoopExtractionPass extends ReshapingPass {
} else { // operator -> loop
assignedLoopVertex.addDagIncomingEdge(irEdge);
final IREdge edgeToLoop = new IREdge(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
- irEdge.getSrc(), assignedLoopVertex, irEdge.getCoder(), irEdge.isSideInput());
+ irEdge.getSrc(), assignedLoopVertex, irEdge.isSideInput());
irEdge.copyExecutionPropertiesTo(edgeToLoop);
builder.connectVertices(edgeToLoop);
assignedLoopVertex.mapEdgeWithLoop(edgeToLoop, irEdge);
@@ -227,13 +227,13 @@ public final class LoopExtractionPass extends ReshapingPass {
// add the new IREdge to the iterative incoming edges list.
final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
- equivalentSrcVertex, equivalentDstVertex, edge.getCoder(), edge.isSideInput());
+ equivalentSrcVertex, equivalentDstVertex, edge.isSideInput());
edge.copyExecutionPropertiesTo(newIrEdge);
finalRootLoopVertex.addIterativeIncomingEdge(newIrEdge);
} else {
// src is from outside the previous loop. vertex outside previous loop -> DAG.
final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
- srcVertex, equivalentDstVertex, edge.getCoder(), edge.isSideInput());
+ srcVertex, equivalentDstVertex, edge.isSideInput());
edge.copyExecutionPropertiesTo(newIrEdge);
finalRootLoopVertex.addNonIterativeIncomingEdge(newIrEdge);
}
@@ -246,7 +246,7 @@ public final class LoopExtractionPass extends ReshapingPass {
final IRVertex equivalentSrcVertex = equivalentVertices.get(srcVertex);
final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
- equivalentSrcVertex, dstVertex, edge.getCoder(), edge.isSideInput());
+ equivalentSrcVertex, dstVertex, edge.isSideInput());
edge.copyExecutionPropertiesTo(newIrEdge);
finalRootLoopVertex.addDagOutgoingEdge(newIrEdge);
finalRootLoopVertex.mapEdgeWithLoop(loopVertex.getEdgeWithLoop(edge), newIrEdge);
@@ -291,7 +291,7 @@ public final class LoopExtractionPass extends ReshapingPass {
builder.connectVertices(edge);
} else {
final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
- firstEquivalentVertex, irVertex, edge.getCoder(), edge.isSideInput());
+ firstEquivalentVertex, irVertex, edge.isSideInput());
edge.copyExecutionPropertiesTo(newIrEdge);
builder.connectVertices(newIrEdge);
}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
index d2aaeee..1e5f23f 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
@@ -16,6 +16,7 @@
package edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping;
import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.vertex.LoopVertex;
import edu.snu.nemo.common.dag.DAG;
@@ -159,7 +160,7 @@ public final class LoopOptimizations {
inEdges.getOrDefault(loopVertex, new ArrayList<>()).forEach(irEdge -> {
if (builder.contains(irEdge.getSrc())) {
final IREdge newIREdge = new IREdge(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
- irEdge.getSrc(), newLoopVertex, irEdge.getCoder(), irEdge.isSideInput());
+ irEdge.getSrc(), newLoopVertex, irEdge.isSideInput());
irEdge.copyExecutionPropertiesTo(newIREdge);
builder.connectVertices(newIREdge);
}
@@ -168,7 +169,7 @@ public final class LoopOptimizations {
outEdges.getOrDefault(loopVertex, new ArrayList<>()).forEach(irEdge -> {
if (builder.contains(irEdge.getDst())) {
final IREdge newIREdge = new IREdge(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
- newLoopVertex, irEdge.getDst(), irEdge.getCoder(), irEdge.isSideInput());
+ newLoopVertex, irEdge.getDst(), irEdge.isSideInput());
irEdge.copyExecutionPropertiesTo(newIREdge);
builder.connectVertices(newIREdge);
}
@@ -285,8 +286,10 @@ public final class LoopOptimizations {
candidate.getValue().stream().map(IREdge::getSrc).anyMatch(edgeSrc -> edgeSrc.equals(e.getSrc())))
.forEach(edge -> {
edgesToRemove.add(edge);
- edgesToAdd.add(new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
- candidate.getKey(), edge.getDst(), edge.getCoder(), edge.isSideInput()));
+ final IREdge newEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
+ candidate.getKey(), edge.getDst(), edge.isSideInput());
+ newEdge.setProperty(CoderProperty.of(edge.getProperty(ExecutionProperty.Key.Coder)));
+ edgesToAdd.add(newEdge);
});
final List<IREdge> listToModify = inEdges.getOrDefault(loopVertex, new ArrayList<>());
listToModify.removeAll(edgesToRemove);
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
index 7770065..25f941c 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
@@ -17,6 +17,7 @@ package edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.dag.DAGBuilder;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
@@ -59,10 +60,11 @@ public final class SailfishRelayReshapingPass extends ReshapingPass {
final OperatorVertex iFileMergerVertex = new OperatorVertex(new RelayTransform());
builder.addVertex(iFileMergerVertex);
final IREdge newEdgeToMerger = new IREdge(DataCommunicationPatternProperty.Value.Shuffle,
- edge.getSrc(), iFileMergerVertex, edge.getCoder(), edge.isSideInput());
- final IREdge newEdgeFromMerger = new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
- iFileMergerVertex, v, edge.getCoder());
+ edge.getSrc(), iFileMergerVertex, edge.isSideInput());
edge.copyExecutionPropertiesTo(newEdgeToMerger);
+ final IREdge newEdgeFromMerger = new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
+ iFileMergerVertex, v);
+ newEdgeFromMerger.setProperty(CoderProperty.of(edge.getProperty(ExecutionProperty.Key.Coder)));
builder.connectVertices(newEdgeToMerger);
builder.connectVertices(newEdgeFromMerger);
} else {
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
index adaf752..3e791dc 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
@@ -186,8 +186,7 @@ public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdg
irEdge.getId(),
irEdge.getExecutionProperties(),
irEdge.getSrc(),
- irEdge.getDst(),
- irEdge.getCoder()));
+ irEdge.getDst()));
} else { // edge comes from another stage
final Stage srcStage = vertexStageMap.get(srcVertex);
@@ -201,7 +200,6 @@ public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdg
.setSrcVertex(srcVertex)
.setDstVertex(dstVertex)
.setSrcStage(srcStage)
- .setCoder(irEdge.getCoder())
.setSideInputFlag(irEdge.isSideInput());
currentStageIncomingEdges.add(newEdgeBuilder);
}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java
index c86d1e7..528beb8 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java
@@ -15,7 +15,6 @@
*/
package edu.snu.nemo.runtime.common.plan;
-import edu.snu.nemo.common.coder.Coder;
import edu.snu.nemo.common.dag.Edge;
import edu.snu.nemo.common.dag.Vertex;
import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
@@ -26,69 +25,60 @@ import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
* @param <V> the vertex type.
*/
public class RuntimeEdge<V extends Vertex> extends Edge<V> {
- private final ExecutionPropertyMap edgeProperties;
- private final Coder coder;
+ private final ExecutionPropertyMap executionProperties;
private final Boolean isSideInput;
/**
* Constructs the edge given the below parameters.
- * @param runtimeEdgeId the id of this edge.
- * @param edgeProperties to control the data flow on this edge.
- * @param src the source vertex.
- * @param dst the destination vertex.
- * @param coder coder.
+ * This constructor assumes that this edge is not for a side input.
+ *
+ * @param runtimeEdgeId the id of this edge.
+ * @param executionProperties to control the data flow on this edge.
+ * @param src the source vertex.
+ * @param dst the destination vertex.
*/
public RuntimeEdge(final String runtimeEdgeId,
- final ExecutionPropertyMap edgeProperties,
+ final ExecutionPropertyMap executionProperties,
final V src,
- final V dst,
- final Coder coder) {
- this(runtimeEdgeId, edgeProperties, src, dst, coder, false);
+ final V dst) {
+ this(runtimeEdgeId, executionProperties, src, dst, false);
}
/**
* Constructs the edge given the below parameters.
- * @param runtimeEdgeId the id of this edge.
- * @param edgeProperties to control the data flow on this edge.
- * @param src the source vertex.
- * @param dst the destination vertex.
- * @param coder coder.
- * @param isSideInput Whether or not the RuntimeEdge is a side input edge.
+ *
+ * @param runtimeEdgeId the id of this edge.
+ * @param executionProperties to control the data flow on this edge.
+ * @param src the source vertex.
+ * @param dst the destination vertex.
+ * @param isSideInput Whether or not the RuntimeEdge is a side input edge.
*/
public RuntimeEdge(final String runtimeEdgeId,
- final ExecutionPropertyMap edgeProperties,
+ final ExecutionPropertyMap executionProperties,
final V src,
final V dst,
- final Coder coder,
final Boolean isSideInput) {
super(runtimeEdgeId, src, dst);
- this.edgeProperties = edgeProperties;
- this.coder = coder;
+ this.executionProperties = executionProperties;
this.isSideInput = isSideInput;
}
/**
* Get the execution property of the Runtime Edge.
- * @param <T> Type of the return value.
+ *
+ * @param <T> Type of the return value.
* @param executionPropertyKey key of the execution property.
* @return the execution property.
*/
public final <T> T getProperty(final ExecutionProperty.Key executionPropertyKey) {
- return edgeProperties.get(executionPropertyKey);
+ return executionProperties.get(executionPropertyKey);
}
/**
* @return the ExecutionPropertyMap of the Runtime Edge.
*/
public final ExecutionPropertyMap getExecutionProperties() {
- return edgeProperties;
- }
-
- /**
- * @return the coder for encoding and decoding.
- */
- public final Coder getCoder() {
- return coder;
+ return executionProperties;
}
/**
@@ -106,9 +96,8 @@ public class RuntimeEdge<V extends Vertex> extends Edge<V> {
public String propertiesToJSON() {
final StringBuilder sb = new StringBuilder();
sb.append("{\"runtimeEdgeId\": \"").append(getId());
- sb.append("\", \"edgeProperties\": ").append(edgeProperties);
- sb.append(", \"coder\": \"").append(coder.toString());
- sb.append("\"}");
+ sb.append("\", \"executionProperties\": ").append(executionProperties);
+ sb.append("}");
return sb.toString();
}
}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
index 3b8cdf0..ec73077 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
@@ -15,7 +15,6 @@
*/
package edu.snu.nemo.runtime.common.plan;
-import edu.snu.nemo.common.coder.Coder;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
import edu.snu.nemo.runtime.common.data.KeyRange;
@@ -48,24 +47,23 @@ public final class StageEdge extends RuntimeEdge<Stage> {
/**
* Constructor.
- * @param runtimeEdgeId id of the runtime edge.
+ *
+ * @param runtimeEdgeId id of the runtime edge.
* @param edgeProperties edge execution properties.
- * @param srcVertex source IRVertex in the srcStage of this edge.
- * @param dstVertex destination IRVertex in the dstStage of this edge.
- * @param srcStage source stage.
- * @param dstStage destination stage.
- * @param coder the coder for enconding and deconding.
- * @param isSideInput whether or not the edge is a sideInput edge.
+ * @param srcVertex source IRVertex in the srcStage of this edge.
+ * @param dstVertex destination IRVertex in the dstStage of this edge.
+ * @param srcStage source stage.
+ * @param dstStage destination stage.
+ * @param isSideInput whether or not the edge is a sideInput edge.
*/
- public StageEdge(final String runtimeEdgeId,
- final ExecutionPropertyMap edgeProperties,
- final IRVertex srcVertex,
- final IRVertex dstVertex,
- final Stage srcStage,
- final Stage dstStage,
- final Coder coder,
- final Boolean isSideInput) {
- super(runtimeEdgeId, edgeProperties, srcStage, dstStage, coder, isSideInput);
+ StageEdge(final String runtimeEdgeId,
+ final ExecutionPropertyMap edgeProperties,
+ final IRVertex srcVertex,
+ final IRVertex dstVertex,
+ final Stage srcStage,
+ final Stage dstStage,
+ final Boolean isSideInput) {
+ super(runtimeEdgeId, edgeProperties, srcStage, dstStage, isSideInput);
this.srcVertex = srcVertex;
this.dstVertex = dstVertex;
// Initialize the key range of each dst task.
@@ -96,7 +94,6 @@ public final class StageEdge extends RuntimeEdge<Stage> {
sb.append("\", \"edgeProperties\": ").append(getExecutionProperties());
sb.append(", \"externalSrcVertexId\": \"").append(srcVertex.getId());
sb.append("\", \"externalDstVertexId\": \"").append(dstVertex.getId());
- sb.append("\", \"coder\": \"").append(getCoder().toString());
sb.append("\"}");
return sb.toString();
}
@@ -110,6 +107,7 @@ public final class StageEdge extends RuntimeEdge<Stage> {
/**
* Sets the task idx to key range list.
+ *
* @param taskIdxToKeyRange the list to set.
*/
public void setTaskIdxToKeyRange(final List<KeyRange> taskIdxToKeyRange) {
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java
index b7448a1..5630b71 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java
@@ -15,8 +15,6 @@
*/
package edu.snu.nemo.runtime.common.plan;
-
-import edu.snu.nemo.common.coder.Coder;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
@@ -30,19 +28,20 @@ public final class StageEdgeBuilder {
private Stage dstStage;
private IRVertex srcVertex;
private IRVertex dstVertex;
- private Coder coder;
private Boolean isSideInput;
/**
* Represents the edge between vertices in a logical plan.
+ *
* @param irEdgeId id of this edge.
*/
- StageEdgeBuilder(final String irEdgeId) {
+ public StageEdgeBuilder(final String irEdgeId) {
this.stageEdgeId = irEdgeId;
}
/**
* Setter for edge properties.
+ *
* @param ea the edge properties.
* @return the updated StageEdgeBuilder.
*/
@@ -53,6 +52,7 @@ public final class StageEdgeBuilder {
/**
* Setter for the source stage.
+ *
* @param ss the source stage.
* @return the updated StageEdgeBuilder.
*/
@@ -63,6 +63,7 @@ public final class StageEdgeBuilder {
/**
* Setter for the destination stage.
+ *
* @param ds the destination stage.
* @return the updated StageEdgeBuilder.
*/
@@ -73,6 +74,7 @@ public final class StageEdgeBuilder {
/**
* Setter for the source vertex.
+ *
* @param sv the source vertex.
* @return the updated StageEdgeBuilder.
*/
@@ -83,6 +85,7 @@ public final class StageEdgeBuilder {
/**
* Setter for the destination vertex.
+ *
* @param dv the destination vertex.
* @return the updated StageEdgeBuilder.
*/
@@ -92,17 +95,8 @@ public final class StageEdgeBuilder {
}
/**
- * Setter for coder.
- * @param c the coder.
- * @return the updated StageEdgeBuilder.
- */
- public StageEdgeBuilder setCoder(final Coder c) {
- this.coder = c;
- return this;
- }
-
- /**
* Setter for side input flag.
+ *
* @param sideInputFlag the side input flag.
* @return the updated StageEdgeBuilder.
*/
@@ -111,7 +105,10 @@ public final class StageEdgeBuilder {
return this;
}
+ /**
+ * @return the built stage edge.
+ */
public StageEdge build() {
- return new StageEdge(stageEdgeId, edgeProperties, srcVertex, dstVertex, srcStage, dstStage, coder, isSideInput);
+ return new StageEdge(stageEdgeId, edgeProperties, srcVertex, dstVertex, srcStage, dstStage, isSideInput);
}
}
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
index 8b2925d..155a9b0 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
@@ -17,6 +17,7 @@ package edu.snu.nemo.runtime.executor;
import com.google.protobuf.ByteString;
import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.conf.JobConf;
import edu.snu.nemo.common.exception.IllegalMessageException;
@@ -105,13 +106,13 @@ public final class Executor {
final TaskStateManager taskStateManager =
new TaskStateManager(task, executorId, persistentConnectionToMasterMap, metricMessageSender);
- task.getTaskIncomingEdges()
- .forEach(e -> serializerManager.register(e.getId(), e.getCoder(), e.getExecutionProperties()));
- task.getTaskOutgoingEdges()
- .forEach(e -> serializerManager.register(e.getId(), e.getCoder(), e.getExecutionProperties()));
+ task.getTaskIncomingEdges().forEach(e -> serializerManager.register(e.getId(),
+ e.getProperty(ExecutionProperty.Key.Coder), e.getExecutionProperties()));
+ task.getTaskOutgoingEdges().forEach(e -> serializerManager.register(e.getId(),
+ e.getProperty(ExecutionProperty.Key.Coder), e.getExecutionProperties()));
irDag.getVertices().forEach(v -> {
- irDag.getOutgoingEdgesOf(v)
- .forEach(e -> serializerManager.register(e.getId(), e.getCoder(), e.getExecutionProperties()));
+ irDag.getOutgoingEdgesOf(v).forEach(e -> serializerManager.register(e.getId(),
+ e.getProperty(ExecutionProperty.Key.Coder), e.getExecutionProperties()));
});
new TaskExecutor(
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
index f2c0082..2c4a2ac 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -260,10 +260,9 @@ public final class TaskExecutorTest {
final IRVertex dst,
final boolean isSideInput) {
final String runtimeIREdgeId = "Runtime edge between operator tasks";
- final Coder coder = Coder.DUMMY_CODER;
ExecutionPropertyMap edgeProperties = new ExecutionPropertyMap(runtimeIREdgeId);
edgeProperties.put(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
- return new RuntimeEdge<>(runtimeIREdgeId, edgeProperties, src, dst, coder, isSideInput);
+ return new RuntimeEdge<>(runtimeIREdgeId, edgeProperties, src, dst, isSideInput);
}
diff --git a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
index 9d4142d..4888a1f 100644
--- a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
+++ b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
@@ -15,7 +15,6 @@
*/
package edu.snu.nemo.runtime.plangenerator;
-import edu.snu.nemo.common.coder.Coder;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.dag.DAGBuilder;
import edu.snu.nemo.common.ir.edge.IREdge;
@@ -136,16 +135,16 @@ public final class TestPlanGenerator {
v5.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
dagBuilder.addVertex(v5);
- final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, Coder.DUMMY_CODER);
+ final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2);
dagBuilder.connectVertices(e1);
- final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v2, Coder.DUMMY_CODER);
+ final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v2);
dagBuilder.connectVertices(e2);
- final IREdge e3 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4, Coder.DUMMY_CODER);
+ final IREdge e3 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4);
dagBuilder.connectVertices(e3);
- final IREdge e4 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v5, Coder.DUMMY_CODER);
+ final IREdge e4 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v5);
dagBuilder.connectVertices(e4);
return dagBuilder.buildWithoutSourceSinkCheck();
@@ -182,10 +181,10 @@ public final class TestPlanGenerator {
}
dagBuilder.addVertex(v3);
- final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, Coder.DUMMY_CODER);
+ final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2);
dagBuilder.connectVertices(e1);
- final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v2, v3, Coder.DUMMY_CODER);
+ final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v2, v3);
dagBuilder.connectVertices(e2);
return dagBuilder.buildWithoutSourceSinkCheck();
diff --git a/tests/src/test/java/edu/snu/nemo/tests/common/ir/LoopVertexTest.java b/tests/src/test/java/edu/snu/nemo/tests/common/ir/LoopVertexTest.java
index 0368499..919530d 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/common/ir/LoopVertexTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/common/ir/LoopVertexTest.java
@@ -54,26 +54,17 @@ public class LoopVertexTest {
final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
loopDAGBuilder.addVertex(map1).addVertex(groupByKey).addVertex(combine).addVertex(map2)
- .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle,
- map1, groupByKey, Coder.DUMMY_CODER))
- .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
- groupByKey, combine, Coder.DUMMY_CODER))
- .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
- combine, map2, Coder.DUMMY_CODER));
- loopVertex.addDagIncomingEdge(new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
- source, map1, Coder.DUMMY_CODER));
- loopVertex.addIterativeIncomingEdge(new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
- map2, map1, Coder.DUMMY_CODER));
+ .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle, map1, groupByKey))
+ .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, groupByKey, combine))
+ .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, combine, map2));
+ loopVertex.addDagIncomingEdge(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, map1));
+ loopVertex.addIterativeIncomingEdge(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, map2, map1));
originalDAG = builder.addVertex(source).addVertex(map1).addVertex(groupByKey).addVertex(combine).addVertex(map2)
- .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
- source, map1, Coder.DUMMY_CODER))
- .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle,
- map1, groupByKey, Coder.DUMMY_CODER))
- .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
- groupByKey, combine, Coder.DUMMY_CODER))
- .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
- combine, map2, Coder.DUMMY_CODER))
+ .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, map1))
+ .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle, map1, groupByKey))
+ .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, groupByKey, combine))
+ .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, combine, map2))
.build();
}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/common/ir/executionproperty/ExecutionPropertyMapTest.java b/tests/src/test/java/edu/snu/nemo/tests/common/ir/executionproperty/ExecutionPropertyMapTest.java
index 8f0c153..f6f089f 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/common/ir/executionproperty/ExecutionPropertyMapTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/common/ir/executionproperty/ExecutionPropertyMapTest.java
@@ -17,6 +17,7 @@ package edu.snu.nemo.tests.common.ir.executionproperty;
import edu.snu.nemo.common.coder.Coder;
import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
@@ -39,8 +40,7 @@ public class ExecutionPropertyMapTest {
private final IRVertex source = new EmptyComponents.EmptySourceVertex<>("Source");
private final IRVertex destination = new OperatorVertex(new EmptyComponents.EmptyTransform("MapElements"));
private final DataCommunicationPatternProperty.Value comPattern = DataCommunicationPatternProperty.Value.OneToOne;
- private final IREdge edge = new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
- source, destination, Coder.DUMMY_CODER);
+ private final IREdge edge = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, destination);
private ExecutionPropertyMap edgeMap;
private ExecutionPropertyMap vertexMap;
@@ -65,6 +65,8 @@ public class ExecutionPropertyMapTest {
assertEquals(DataStoreProperty.Value.MemoryStore, edgeMap.get(ExecutionProperty.Key.DataStore));
edgeMap.put(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
assertEquals(DataFlowModelProperty.Value.Pull, edgeMap.get(ExecutionProperty.Key.DataFlowModel));
+ edgeMap.put(CoderProperty.of(Coder.DUMMY_CODER));
+ assertEquals(Coder.DUMMY_CODER, edgeMap.get(ExecutionProperty.Key.Coder));
edgeMap.remove(ExecutionProperty.Key.DataFlowModel);
assertNull(edgeMap.get(ExecutionProperty.Key.DataFlowModel));
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
index ebbc7b6..cb4b70c 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
@@ -20,7 +20,6 @@ import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.vertex.OperatorVertex;
-import edu.snu.nemo.common.coder.Coder;
import edu.snu.nemo.compiler.backend.nemo.NemoBackend;
import edu.snu.nemo.common.dag.DAGBuilder;
import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
@@ -54,12 +53,10 @@ public final class NemoBackendTest<I, O> {
@Before
public void setUp() throws Exception {
this.dag = builder.addVertex(source).addVertex(map1).addVertex(groupByKey).addVertex(combine).addVertex(map2)
- .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, map1, Coder.DUMMY_CODER))
- .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle,
- map1, groupByKey, Coder.DUMMY_CODER))
- .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
- groupByKey, combine, Coder.DUMMY_CODER))
- .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, combine, map2, Coder.DUMMY_CODER))
+ .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, map1))
+ .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle, map1, groupByKey))
+ .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, groupByKey, combine))
+ .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, combine, map2))
.build();
this.dag = CompiletimeOptimizer.optimize(dag, new PadoPolicy(), EMPTY_DAG_DIRECTORY);
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPassTest.java
new file mode 100644
index 0000000..6eec026
--- /dev/null
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPassTest.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright (C) 2017 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.tests.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.client.JobLauncher;
+import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.AnnotatingPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultEdgeCoderPass;
+import edu.snu.nemo.tests.compiler.CompilerTestUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test {@link edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultEdgeCoderPass}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JobLauncher.class)
+public class DefaultEdgeCoderPassTest {
+ private DAG<IRVertex, IREdge> compiledDAG;
+
+ @Before
+ public void setUp() throws Exception {
+ compiledDAG = CompilerTestUtil.compileMRDAG();
+ }
+
+ @Test
+ public void testAnnotatingPass() {
+ final AnnotatingPass coderPass = new DefaultEdgeCoderPass();
+ assertEquals(ExecutionProperty.Key.Coder, coderPass.getExecutionPropertyToModify());
+ }
+
+ @Test
+ public void testNotOverride() {
+ // Get the first coder from the compiled DAG
+ final Coder compiledCoder = compiledDAG
+ .getOutgoingEdgesOf(compiledDAG.getTopologicalSort().get(0)).get(0).getProperty(ExecutionProperty.Key.Coder);
+ final DAG<IRVertex, IREdge> processedDAG = new DefaultEdgeCoderPass().apply(compiledDAG);
+
+ // Get the first coder from the processed DAG
+ final Coder processedCoder = processedDAG
+ .getOutgoingEdgesOf(processedDAG.getTopologicalSort().get(0)).get(0).getProperty(ExecutionProperty.Key.Coder);
+ assertEquals(compiledCoder, processedCoder); // It must not be changed.
+ }
+
+ @Test
+ public void testSetToDefault() throws Exception {
+ // Remove the first coder from the compiled DAG (to let our pass to set as default coder).
+ compiledDAG.getOutgoingEdgesOf(compiledDAG.getTopologicalSort().get(0))
+ .get(0).getExecutionProperties().remove(ExecutionProperty.Key.Coder);
+ final DAG<IRVertex, IREdge> processedDAG = new DefaultEdgeCoderPass().apply(compiledDAG);
+
+ // Check whether the pass set the empty coder to our default coder.
+ final Coder processedCoder = processedDAG
+ .getOutgoingEdgesOf(processedDAG.getTopologicalSort().get(0)).get(0).getProperty(ExecutionProperty.Key.Coder);
+ assertEquals(Coder.DUMMY_CODER, processedCoder);
+ }
+}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPassTest.java
index 2be50db..6c8a52a 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPassTest.java
@@ -38,7 +38,7 @@ import static org.junit.Assert.assertEquals;
@RunWith(PowerMockRunner.class)
@PrepareForTest(JobLauncher.class)
public class DefaultParallelismPassTest {
- DAG<IRVertex, IREdge> compiledDAG;
+ private DAG<IRVertex, IREdge> compiledDAG;
@Before
public void setUp() throws Exception {
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPassTest.java
index d6f6353..28fc29f 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPassTest.java
@@ -16,7 +16,6 @@
package edu.snu.nemo.tests.compiler.optimizer.pass.compiletime.reshaping;
import edu.snu.nemo.client.JobLauncher;
-import edu.snu.nemo.common.coder.Coder;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
@@ -58,16 +57,16 @@ public class CommonSubexpressionEliminationPassTest {
final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
dagNotToOptimize = dagBuilder.addVertex(source).addVertex(map1).addVertex(groupByKey).addVertex(combine)
.addVertex(map2)
- .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, map1, Coder.DUMMY_CODER))
- .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle, map1, groupByKey, Coder.DUMMY_CODER))
- .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, groupByKey, combine, Coder.DUMMY_CODER))
- .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, combine, map2, Coder.DUMMY_CODER))
+ .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, map1))
+ .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle, map1, groupByKey))
+ .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, groupByKey, combine))
+ .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, combine, map2))
.build();
dagToOptimize = dagBuilder.addVertex(map1clone).addVertex(groupByKey2).addVertex(combine2).addVertex(map22)
- .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, map1clone, Coder.DUMMY_CODER))
- .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle, map1clone, groupByKey2, Coder.DUMMY_CODER))
- .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, groupByKey2, combine2, Coder.DUMMY_CODER))
- .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, combine2, map22, Coder.DUMMY_CODER))
+ .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, map1clone))
+ .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle, map1clone, groupByKey2))
+ .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, groupByKey2, combine2))
+ .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, combine2, map22))
.build();
}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java
index e670c4d..159feba 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java
@@ -19,6 +19,7 @@ import edu.snu.nemo.client.JobLauncher;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.dag.DAGBuilder;
import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.vertex.LoopVertex;
@@ -101,7 +102,8 @@ public class LoopFusionPassTest {
* This method adds a LoopVertex at the end of the DAG (no more outgoing edges), after the
* {@param vertexToBeFollowed}. We assume, as in the MLR, ALS DAG, that iterative incoming edges work to receive
* main inputs, and non-iterative incoming edges work to receive side inputs.
- * @param builder builder to add the LoopVertex to.
+ *
+ * @param builder builder to add the LoopVertex to.
* @param vertexToBeFollowed vertex that is to be followed by the LoopVertex.
* @param loopVertexToFollow the new LoopVertex that will be added.
*/
@@ -111,12 +113,14 @@ public class LoopFusionPassTest {
builder.addVertex(loopVertexToFollow);
loopVertexToFollow.getIterativeIncomingEdges().values().forEach(irEdges -> irEdges.forEach(irEdge -> {
final IREdge newIREdge = new IREdge(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
- vertexToBeFollowed, loopVertexToFollow, irEdge.getCoder());
+ vertexToBeFollowed, loopVertexToFollow);
+ newIREdge.setProperty(CoderProperty.of(irEdge.getProperty(ExecutionProperty.Key.Coder)));
builder.connectVertices(newIREdge);
}));
loopVertexToFollow.getNonIterativeIncomingEdges().values().forEach(irEdges -> irEdges.forEach(irEdge -> {
final IREdge newIREdge = new IREdge(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
- irEdge.getSrc(), loopVertexToFollow, irEdge.getCoder());
+ irEdge.getSrc(), loopVertexToFollow);
+ newIREdge.setProperty(CoderProperty.of(irEdge.getProperty(ExecutionProperty.Key.Coder)));
builder.connectVertices(newIREdge);
}));
}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
index 4df2fbd..18feb79 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
@@ -19,6 +19,7 @@ import edu.snu.nemo.client.JobLauncher;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.dag.DAGBuilder;
import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.vertex.LoopVertex;
@@ -87,11 +88,11 @@ public class LoopInvariantCodeMotionPassTest {
if (!e.getSrc().equals(vertex7)) {
builder.connectVertices(e);
} else {
- final Optional<IREdge> theIncomingEdge = newDAGIncomingEdge.stream().findFirst();
- assertTrue(theIncomingEdge.isPresent());
- final IREdge newIREdge =
- new IREdge(theIncomingEdge.get().getProperty(ExecutionProperty.Key.DataCommunicationPattern),
- theIncomingEdge.get().getSrc(), alsLoop, theIncomingEdge.get().getCoder());
+ final Optional<IREdge> incomingEdge = newDAGIncomingEdge.stream().findFirst();
+ assertTrue(incomingEdge.isPresent());
+ final IREdge newIREdge = new IREdge(incomingEdge.get().getProperty(
+ ExecutionProperty.Key.DataCommunicationPattern), incomingEdge.get().getSrc(), alsLoop);
+ newIREdge.setProperty(CoderProperty.of(incomingEdge.get().getProperty(ExecutionProperty.Key.Coder)));
builder.connectVertices(newIREdge);
}
});
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
index 287e15b..793c182 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
@@ -29,21 +29,21 @@ public final class PolicyBuilderTest {
@Test
public void testDisaggregationPolicy() {
final Policy disaggregationPolicy = new DisaggregationPolicy();
- assertEquals(12, disaggregationPolicy.getCompileTimePasses().size());
+ assertEquals(13, disaggregationPolicy.getCompileTimePasses().size());
assertEquals(0, disaggregationPolicy.getRuntimePasses().size());
}
@Test
public void testPadoPolicy() {
final Policy padoPolicy = new PadoPolicy();
- assertEquals(14, padoPolicy.getCompileTimePasses().size());
+ assertEquals(15, padoPolicy.getCompileTimePasses().size());
assertEquals(0, padoPolicy.getRuntimePasses().size());
}
@Test
public void testDataSkewPolicy() {
final Policy dataSkewPolicy = new DataSkewPolicy();
- assertEquals(16, dataSkewPolicy.getCompileTimePasses().size());
+ assertEquals(17, dataSkewPolicy.getCompileTimePasses().size());
assertEquals(1, dataSkewPolicy.getRuntimePasses().size());
}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
index f948589..677b2e9 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
@@ -72,7 +72,7 @@ public final class DAGConverterTest {
v2.setProperty(ParallelismProperty.of(2));
irDAGBuilder.addVertex(v2);
- final IREdge e = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, Coder.DUMMY_CODER);
+ final IREdge e = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2);
irDAGBuilder.connectVertices(e);
final DAG<IRVertex, IREdge> irDAG = CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(),
@@ -155,27 +155,27 @@ public final class DAGConverterTest {
// TODO #13: Implement Join Node
// irDAGBuilder.addVertex(v7);
- final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v1, v2, Coder.DUMMY_CODER);
+ final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v1, v2);
e1.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
e1.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
- final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v1, v3, Coder.DUMMY_CODER);
+ final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v1, v3);
e2.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
e2.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
- final IREdge e3 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4, Coder.DUMMY_CODER);
+ final IREdge e3 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4);
e3.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
e3.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Push));
- final IREdge e4 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v5, Coder.DUMMY_CODER);
+ final IREdge e4 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v5);
e4.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
e4.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Push));
- final IREdge e5 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v6, Coder.DUMMY_CODER);
+ final IREdge e5 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v6);
e5.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
e5.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
- final IREdge e6 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v8, Coder.DUMMY_CODER);
+ final IREdge e6 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v8);
e6.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
e6.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
index 697122e..2a8a224 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
@@ -39,6 +39,7 @@ import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
import edu.snu.nemo.runtime.common.plan.Stage;
import edu.snu.nemo.runtime.common.plan.StageEdge;
+import edu.snu.nemo.runtime.common.plan.StageEdgeBuilder;
import edu.snu.nemo.runtime.executor.Executor;
import edu.snu.nemo.runtime.executor.MetricManagerWorker;
import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
@@ -308,22 +309,30 @@ public final class DataTransferTest {
final IRVertex dstVertex = verticesPair.right();
// Edge setup
- final IREdge dummyIREdge = new IREdge(commPattern, srcVertex, dstVertex, CODER);
+ final IREdge dummyIREdge = new IREdge(commPattern, srcVertex, dstVertex);
+ dummyIREdge.setProperty(CoderProperty.of(CODER));
dummyIREdge.setProperty(KeyExtractorProperty.of((element -> element)));
final ExecutionPropertyMap edgeProperties = dummyIREdge.getExecutionProperties();
edgeProperties.put(DataCommunicationPatternProperty.of(commPattern));
edgeProperties.put(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner));
-
edgeProperties.put(DataStoreProperty.of(store));
edgeProperties.put(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Keep));
+ edgeProperties.put(CoderProperty.of(CODER));
final RuntimeEdge dummyEdge;
final IRVertex srcMockVertex = mock(IRVertex.class);
final IRVertex dstMockVertex = mock(IRVertex.class);
final Stage srcStage = setupStages("srcStage-" + testIndex);
final Stage dstStage = setupStages("dstStage-" + testIndex);
- dummyEdge = new StageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex,
- srcStage, dstStage, CODER, false);
+ dummyEdge = new StageEdgeBuilder(edgeId)
+ .setEdgeProperties(edgeProperties)
+ .setSrcVertex(srcMockVertex)
+ .setDstVertex(dstMockVertex)
+ .setSrcStage(srcStage)
+ .setDstStage(dstStage)
+ .setSideInputFlag(false)
+ .build();
+
// Initialize states in Master
srcStage.getTaskIds().forEach(srcTaskId -> {
final String blockId = RuntimeIdGenerator.generateBlockId(
@@ -391,7 +400,8 @@ public final class DataTransferTest {
final IRVertex dstVertex = verticesPair.right();
// Edge setup
- final IREdge dummyIREdge = new IREdge(commPattern, srcVertex, dstVertex, CODER);
+ final IREdge dummyIREdge = new IREdge(commPattern, srcVertex, dstVertex);
+ dummyIREdge.setProperty(CoderProperty.of(CODER));
dummyIREdge.setProperty(KeyExtractorProperty.of((element -> element)));
final ExecutionPropertyMap edgeProperties = dummyIREdge.getExecutionProperties();
edgeProperties.put(DataCommunicationPatternProperty.of(commPattern));
@@ -409,12 +419,24 @@ public final class DataTransferTest {
final IRVertex dstMockVertex = mock(IRVertex.class);
final Stage srcStage = setupStages("srcStage-" + testIndex);
final Stage dstStage = setupStages("dstStage-" + testIndex);
- dummyEdge = new StageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex,
- srcStage, dstStage, CODER, false);
+ dummyEdge = new StageEdgeBuilder(edgeId)
+ .setEdgeProperties(edgeProperties)
+ .setSrcVertex(srcMockVertex)
+ .setDstVertex(dstMockVertex)
+ .setSrcStage(srcStage)
+ .setDstStage(dstStage)
+ .setSideInputFlag(false)
+ .build();
final IRVertex dstMockVertex2 = mock(IRVertex.class);
final Stage dstStage2 = setupStages("dstStage-" + testIndex2);
- dummyEdge2 = new StageEdge(edgeId2, edgeProperties, srcMockVertex, dstMockVertex2,
- srcStage, dstStage2, CODER, false);
+ dummyEdge2 = new StageEdgeBuilder(edgeId2)
+ .setEdgeProperties(edgeProperties)
+ .setSrcVertex(srcMockVertex)
+ .setDstVertex(dstMockVertex2)
+ .setSrcStage(srcStage)
+ .setDstStage(dstStage2)
+ .setSideInputFlag(false)
+ .build();
// Initialize states in Master
srcStage.getTaskIds().forEach(srcTaskId -> {
final String blockId = RuntimeIdGenerator.generateBlockId(
--
To stop receiving notification emails like this one, please contact
johnyangk@apache.org.