You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/06/15 00:19:29 UTC

[GitHub] johnyangk closed pull request #32: [NEMO-101] Make Coder as an execution property

johnyangk closed pull request #32: [NEMO-101] Make Coder as an execution property
URL: https://github.com/apache/incubator-nemo/pull/32
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bin/json2dot.py b/bin/json2dot.py
index 85538e70..8a841f8a 100755
--- a/bin/json2dot.py
+++ b/bin/json2dot.py
@@ -27,7 +27,8 @@
 nextIdx = 0
 
 def edgePropertiesString(properties):
-    return '/'.join(['SideInput' if x[0] == 'IsSideInput' else x[1].split('.')[-1] for x in sorted(properties.items())])
+    prop = {p[0]: p[1] for p in properties.items() if p[0] != 'Coder'}
+    return '/'.join(['SideInput' if x[0] == 'IsSideInput' else x[1].split('.')[-1] for x in sorted(prop.items())])
 
 def getIdx():
     global nextIdx
@@ -275,10 +276,6 @@ def logicalEnd(self):
         return 'cluster_{}'.format(self.idx)
 
 def Edge(src, dst, properties):
-    try:
-        return StageEdge(src, dst, properties)
-    except:
-        pass
     try:
         return StageEdge(src, dst, properties)
     except:
@@ -308,7 +305,7 @@ def __init__(self, src, dst, properties):
         self.dst = dst
         self.id = properties['id']
         self.executionProperties = properties['executionProperties']
-        self.coder = properties['coder']
+        self.coder = self.executionProperties['Coder']
     @property
     def dot(self):
         src = self.src
@@ -325,39 +322,16 @@ def dot(self):
         return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(src.oneVertex.idx,
                 dst.oneVertex.idx, src.logicalEnd, dst.logicalEnd, label)
 
-class StageEdge:
-    def __init__(self, src, dst, properties):
-        self.src = src
-        self.dst = dst
-        self.runtimeEdgeId = properties['runtimeEdgeId']
-        self.edgeProperties = properties['edgeProperties']
-        self.externalVertexAttr = properties['externalVertexAttr']
-        self.parallelism = self.externalVertexAttr['Parallelism']
-        self.coder = properties['coder']
-    @property
-    def dot(self):
-        color = 'black'
-        try:
-            if self.externalVertexAttr['ContainerType'] == 'Transient':
-                color = 'orange'
-            if self.externalVertexAttr['ContainerType'] == 'Reserved':
-                color = 'green'
-        except:
-            pass
-        label = '{} (p{})<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(self.runtimeEdgeId, self.parallelism, edgePropertiesString(self.edgeProperties), self.coder)
-        return '{} -> {} [ltail = {}, lhead = {}, label = <{}>, color = {}];'.format(self.src.oneVertex.idx,
-                self.dst.oneVertex.idx, self.src.logicalEnd, self.dst.logicalEnd, label, color)
-
 class StageEdge:
     def __init__(self, src, dst, properties):
         self.src = src.internalDAG.vertices[properties['srcVertex']]
         self.dst = dst.internalDAG.vertices[properties['dstVertex']]
         self.runtimeEdgeId = properties['runtimeEdgeId']
-        self.edgeProperties = properties['edgeProperties']
-        self.coder = properties['coder']
+        self.executionProperties = properties['executionProperties']
+        self.coder = self.executionProperties['Coder']
     @property
     def dot(self):
-        label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(self.runtimeEdgeId, edgePropertiesString(self.edgeProperties), self.coder)
+        label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(self.runtimeEdgeId, edgePropertiesString(self.executionProperties), self.coder)
         return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(self.src.oneVertex.idx,
                 self.dst.oneVertex.idx, self.src.logicalEnd, self.dst.logicalEnd, label)
 
@@ -366,11 +340,11 @@ def __init__(self, src, dst, properties):
         self.src = src
         self.dst = dst
         self.runtimeEdgeId = properties['runtimeEdgeId']
-        self.edgeProperties = properties['edgeProperties']
-        self.coder = properties['coder']
+        self.executionProperties = properties['executionProperties']
+        self.coder = self.executionProperties['Coder']
     @property
     def dot(self):
-        label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(self.runtimeEdgeId, edgePropertiesString(self.edgeProperties), self.coder)
+        label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(self.runtimeEdgeId, edgePropertiesString(self.executionProperties), self.coder)
         return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(self.src.oneVertex.idx,
                 self.dst.oneVertex.idx, self.src.logicalEnd, self.dst.logicalEnd, label)
 
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/Coder.java b/common/src/main/java/edu/snu/nemo/common/coder/Coder.java
index c0d9eb41..45071433 100644
--- a/common/src/main/java/edu/snu/nemo/common/coder/Coder.java
+++ b/common/src/main/java/edu/snu/nemo/common/coder/Coder.java
@@ -21,7 +21,8 @@
 import java.io.Serializable;
 
 /**
- * A {@link Coder Coder&lt;T&gt;} object encodes or decodes values of type {@code T} into byte streams.
+ * A coder object encodes or decodes values of type {@code T} into byte streams.
+ *
  * @param <T> element type.
  */
 public interface Coder<T> extends Serializable {
@@ -31,7 +32,7 @@
    * Because the user can want to keep a single output stream and continuously concatenate elements,
    * the output stream should not be closed.
    *
-   * @param element the element to be encoded
+   * @param element   the element to be encoded
    * @param outStream the stream on which encoded bytes are written
    * @throws IOException if fail to encode
    */
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/IREdge.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/IREdge.java
index 9dbaabb8..cfb6497f 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/IREdge.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/IREdge.java
@@ -15,7 +15,6 @@
  */
 package edu.snu.nemo.common.ir.edge;
 
-import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.dag.Edge;
 import edu.snu.nemo.common.ir.IdManager;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
@@ -30,44 +29,42 @@
  */
 public final class IREdge extends Edge<IRVertex> {
   private final ExecutionPropertyMap executionProperties;
-  private final Coder coder;
   private final Boolean isSideInput;
 
   /**
    * Constructor of IREdge.
+   * This constructor assumes that this edge is not for a side input.
+   *
    * @param commPattern data communication pattern type of the edge.
-   * @param src source vertex.
-   * @param dst destination vertex.
-   * @param coder coder.
+   * @param src         source vertex.
+   * @param dst         destination vertex.
    */
   public IREdge(final DataCommunicationPatternProperty.Value commPattern,
                 final IRVertex src,
-                final IRVertex dst,
-                final Coder coder) {
-    this(commPattern, src, dst, coder, false);
+                final IRVertex dst) {
+    this(commPattern, src, dst, false);
   }
 
   /**
    * Constructor of IREdge.
+   *
    * @param commPattern data communication pattern type of the edge.
-   * @param src source vertex.
-   * @param dst destination vertex.
-   * @param coder coder.
+   * @param src         source vertex.
+   * @param dst         destination vertex.
    * @param isSideInput flag for whether or not the edge is a sideInput.
    */
   public IREdge(final DataCommunicationPatternProperty.Value commPattern,
                 final IRVertex src,
                 final IRVertex dst,
-                final Coder coder,
                 final Boolean isSideInput) {
     super(IdManager.newEdgeId(), src, dst);
-    this.coder = coder;
     this.isSideInput = isSideInput;
     this.executionProperties = ExecutionPropertyMap.of(this, commPattern);
   }
 
   /**
    * Set an executionProperty of the IREdge.
+   *
    * @param executionProperty the execution property.
    * @return the IREdge with the execution property set.
    */
@@ -78,7 +75,8 @@ public IREdge setProperty(final ExecutionProperty<?> executionProperty) {
 
   /**
    * Get the executionProperty of the IREdge.
-   * @param <T> Type of the return value.
+   *
+   * @param <T>                  Type of the return value.
    * @param executionPropertyKey key of the execution property.
    * @return the execution property.
    */
@@ -93,13 +91,6 @@ public ExecutionPropertyMap getExecutionProperties() {
     return executionProperties;
   }
 
-  /**
-   * @return coder for the edge.
-   */
-  public Coder getCoder() {
-    return coder;
-  }
-
   /**
    * @return whether or not the edge is a side input edge.
    */
@@ -117,6 +108,7 @@ public Boolean hasSameItineraryAs(final IREdge edge) {
 
   /**
    * Static function to copy executionProperties from an edge to the other.
+   *
    * @param thatEdge the edge to copy executionProperties to.
    */
   public void copyExecutionPropertiesTo(final IREdge thatEdge) {
@@ -132,7 +124,7 @@ public boolean equals(final Object o) {
       return false;
     }
 
-    IREdge irEdge = (IREdge) o;
+    final IREdge irEdge = (IREdge) o;
 
     return executionProperties.equals(irEdge.getExecutionProperties()) && hasSameItineraryAs(irEdge);
   }
@@ -151,8 +143,7 @@ public String propertiesToJSON() {
     final StringBuilder sb = new StringBuilder();
     sb.append("{\"id\": \"").append(getId());
     sb.append("\", \"executionProperties\": ").append(executionProperties);
-    sb.append(", \"coder\": \"").append(coder.toString());
-    sb.append("\"}");
+    sb.append("}");
     return sb.toString();
   }
 }
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java
new file mode 100644
index 00000000..1d2e5dc6
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.edge.executionproperty;
+
+import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+
+/**
+ * Coder ExecutionProperty.
+ */
+public final class CoderProperty extends ExecutionProperty<Coder> {
+  /**
+   * Constructor.
+   *
+   * @param value value of the execution property.
+   */
+  private CoderProperty(final Coder value) {
+    super(Key.Coder, value);
+  }
+
+  /**
+   * Static method exposing the constructor.
+   *
+   * @param value value of the new execution property.
+   * @return the newly created execution property.
+   */
+  public static CoderProperty of(final Coder value) {
+    return new CoderProperty(value);
+  }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/KeyExtractorProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/KeyExtractorProperty.java
index eaf73a65..0387e070 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/KeyExtractorProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/KeyExtractorProperty.java
@@ -24,6 +24,7 @@
 public final class KeyExtractorProperty extends ExecutionProperty<KeyExtractor> {
   /**
    * Constructor.
+   *
    * @param value value of the execution property.
    */
   private KeyExtractorProperty(final KeyExtractor value) {
@@ -32,6 +33,7 @@ private KeyExtractorProperty(final KeyExtractor value) {
 
   /**
    * Static method exposing the constructor.
+   *
    * @param value value of the new execution property.
    * @return the newly created execution property.
    */
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionProperty.java
index ef55daab..3bb4a4f3 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionProperty.java
@@ -16,6 +16,7 @@
 package edu.snu.nemo.common.ir.executionproperty;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 /**
  * An abstract class for each execution factors.
@@ -59,6 +60,24 @@ public final Key getKey() {
     };
   }
 
+  @Override
+  public final boolean equals(final Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    final ExecutionProperty<?> that = (ExecutionProperty<?>) o;
+    return getKey() == that.getKey()
+        && Objects.equals(getValue(), that.getValue());
+  }
+
+  @Override
+  public final int hashCode() {
+    return Objects.hash(getKey(), getValue());
+  }
+
   /**
    * Key for different types of execution property.
    */
@@ -73,6 +92,7 @@ public final Key getKey() {
     UsedDataHandling,
     Compression,
     DuplicateEdgeGroup,
+    Coder,
 
     // Applies to IRVertex
     DynamicOptimizationType,
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/LoopVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/LoopVertex.java
index 75f92cd3..78d153ab 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/LoopVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/LoopVertex.java
@@ -218,7 +218,7 @@ public LoopVertex unRollIteration(final DAGBuilder<IRVertex, IREdge> dagBuilder)
       dagToAdd.getIncomingEdgesOf(irVertex).forEach(edge -> {
         final IRVertex newSrc = originalToNewIRVertex.get(edge.getSrc());
         final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-            newSrc, newIrVertex, edge.getCoder(), edge.isSideInput());
+            newSrc, newIrVertex, edge.isSideInput());
         edge.copyExecutionPropertiesTo(newIrEdge);
         dagBuilder.connectVertices(newIrEdge);
       });
@@ -227,7 +227,7 @@ public LoopVertex unRollIteration(final DAGBuilder<IRVertex, IREdge> dagBuilder)
     // process DAG incoming edges.
     getDagIncomingEdges().forEach((dstVertex, irEdges) -> irEdges.forEach(edge -> {
       final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-          edge.getSrc(), originalToNewIRVertex.get(dstVertex), edge.getCoder(), edge.isSideInput());
+          edge.getSrc(), originalToNewIRVertex.get(dstVertex), edge.isSideInput());
       edge.copyExecutionPropertiesTo(newIrEdge);
       dagBuilder.connectVertices(newIrEdge);
     }));
@@ -236,7 +236,7 @@ public LoopVertex unRollIteration(final DAGBuilder<IRVertex, IREdge> dagBuilder)
       // if termination condition met, we process the DAG outgoing edge.
       getDagOutgoingEdges().forEach((srcVertex, irEdges) -> irEdges.forEach(edge -> {
         final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-            originalToNewIRVertex.get(srcVertex), edge.getDst(), edge.getCoder(), edge.isSideInput());
+            originalToNewIRVertex.get(srcVertex), edge.getDst(), edge.isSideInput());
         edge.copyExecutionPropertiesTo(newIrEdge);
         dagBuilder.addVertex(edge.getDst()).connectVertices(newIrEdge);
       }));
@@ -247,7 +247,7 @@ public LoopVertex unRollIteration(final DAGBuilder<IRVertex, IREdge> dagBuilder)
     this.nonIterativeIncomingEdges.forEach((dstVertex, irEdges) -> irEdges.forEach(this::addDagIncomingEdge));
     this.iterativeIncomingEdges.forEach((dstVertex, irEdges) -> irEdges.forEach(edge -> {
       final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-          originalToNewIRVertex.get(edge.getSrc()), dstVertex, edge.getCoder(), edge.isSideInput());
+          originalToNewIRVertex.get(edge.getSrc()), dstVertex, edge.isSideInput());
       edge.copyExecutionPropertiesTo(newIrEdge);
       this.addDagIncomingEdge(newIrEdge);
     }));
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
index f61adc67..3a7f9bd4 100644
--- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
+++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
@@ -15,6 +15,7 @@
  */
 package edu.snu.nemo.compiler.frontend.beam;
 
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
 import edu.snu.nemo.compiler.frontend.beam.coder.BeamCoder;
 import edu.snu.nemo.common.dag.DAGBuilder;
@@ -105,8 +106,8 @@ public void visitPrimitiveTransform(final TransformHierarchy.Node beamNode) {
     beamNode.getInputs().values().stream().filter(pValueToVertex::containsKey)
         .forEach(pValue -> {
           final IRVertex src = pValueToVertex.get(pValue);
-          final BeamCoder coder = pValueToCoder.get(pValue);
-          final IREdge edge = new IREdge(getEdgeCommunicationPattern(src, irVertex), src, irVertex, coder);
+          final IREdge edge = new IREdge(getEdgeCommunicationPattern(src, irVertex), src, irVertex);
+          edge.setProperty(CoderProperty.of(pValueToCoder.get(pValue)));
           edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor()));
           this.builder.connectVertices(edge);
         });
@@ -200,9 +201,9 @@ private static void connectSideInputs(final DAGBuilder<IRVertex, IREdge> builder
     sideInputs.stream().filter(pValueToVertex::containsKey)
         .forEach(pValue -> {
           final IRVertex src = pValueToVertex.get(pValue);
-          final BeamCoder coder = pValueToCoder.get(pValue);
           final IREdge edge = new IREdge(getEdgeCommunicationPattern(src, irVertex),
-              src, irVertex, coder, true);
+              src, irVertex, true);
+          edge.setProperty(CoderProperty.of(pValueToCoder.get(pValue)));
           edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor()));
           builder.connectVertices(edge);
         });
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
index 1cf535a6..8ad00d31 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
@@ -19,6 +19,7 @@
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.KeyExtractorProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
@@ -53,6 +54,8 @@
  * Utility class for RDDs.
  */
 public final class SparkFrontendUtils {
+  private static final KeyExtractorProperty SPARK_KEY_EXTRACTOR_PROP = KeyExtractorProperty.of(new SparkKeyExtractor());
+
   /**
    * Private constructor.
    */
@@ -96,8 +99,9 @@ public static Serializer deriveSerializerFrom(final org.apache.spark.SparkContex
     builder.addVertex(collectVertex, loopVertexStack);
 
     final IREdge newEdge = new IREdge(getEdgeCommunicationPattern(lastVertex, collectVertex),
-        lastVertex, collectVertex, new SparkCoder(serializer));
-    newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor()));
+        lastVertex, collectVertex);
+    newEdge.setProperty(CoderProperty.of(new SparkCoder(serializer)));
+    newEdge.setProperty(SPARK_KEY_EXTRACTOR_PROP);
     builder.connectVertices(newEdge);
 
     // launch DAG
diff --git a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala
index bb1c496d..0b61d7b5 100644
--- a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala
+++ b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala
@@ -19,7 +19,8 @@ import java.util
 
 import edu.snu.nemo.common.dag.DAGBuilder
 import edu.snu.nemo.common.ir.edge.IREdge
-import edu.snu.nemo.common.ir.edge.executionproperty.KeyExtractorProperty
+import edu.snu.nemo.common.ir.edge.executionproperty.{CoderProperty, KeyExtractorProperty}
+import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty
 import edu.snu.nemo.common.ir.vertex.{IRVertex, LoopVertex, OperatorVertex}
 import edu.snu.nemo.compiler.frontend.spark.SparkKeyExtractor
 import edu.snu.nemo.compiler.frontend.spark.coder.SparkCoder
@@ -69,7 +70,9 @@ final class PairRDDFunctions[K: ClassTag, V: ClassTag] protected[rdd] (
     builder.addVertex(reduceByKeyVertex, loopVertexStack)
 
     val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(self.lastVertex, reduceByKeyVertex),
-      self.lastVertex, reduceByKeyVertex, new SparkCoder[Tuple2[K, V]](self.serializer))
+      self.lastVertex, reduceByKeyVertex)
+    newEdge.setProperty(
+      CoderProperty.of(new SparkCoder[Tuple2[K, V]](self.serializer)).asInstanceOf[ExecutionProperty[_]])
     newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor))
     builder.connectVertices(newEdge)
 
diff --git a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala
index cfa144df..12318c55 100644
--- a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala
+++ b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala
@@ -20,7 +20,8 @@ import java.util
 import edu.snu.nemo.client.JobLauncher
 import edu.snu.nemo.common.dag.{DAG, DAGBuilder}
 import edu.snu.nemo.common.ir.edge.IREdge
-import edu.snu.nemo.common.ir.edge.executionproperty.KeyExtractorProperty
+import edu.snu.nemo.common.ir.edge.executionproperty.{CoderProperty, KeyExtractorProperty}
+import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty
 import edu.snu.nemo.common.ir.vertex.{IRVertex, LoopVertex, OperatorVertex}
 import edu.snu.nemo.compiler.frontend.spark.SparkKeyExtractor
 import edu.snu.nemo.compiler.frontend.spark.coder.SparkCoder
@@ -48,8 +49,11 @@ final class RDD[T: ClassTag] protected[rdd] (
     protected[rdd] val lastVertex: IRVertex,
     private val sourceRDD: Option[org.apache.spark.rdd.RDD[T]]) extends org.apache.spark.rdd.RDD[T](_sc, deps) {
 
-  private val loopVertexStack = new util.Stack[LoopVertex]
   protected[rdd] val serializer: Serializer = SparkFrontendUtils.deriveSerializerFrom(_sc)
+  private val loopVertexStack = new util.Stack[LoopVertex]
+  private val coderProperty: ExecutionProperty[_] =
+    CoderProperty.of(new SparkCoder[T](serializer)).asInstanceOf[ExecutionProperty[_]]
+  private val keyExtractorProperty: KeyExtractorProperty = KeyExtractorProperty.of(new SparkKeyExtractor)
 
   /**
    * Constructor without dependencies (not needed in Nemo RDD).
@@ -132,8 +136,9 @@ final class RDD[T: ClassTag] protected[rdd] (
     builder.addVertex(mapVertex, loopVertexStack)
 
     val newEdge: IREdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, mapVertex),
-      lastVertex, mapVertex, new SparkCoder[T](serializer))
-    newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor))
+      lastVertex, mapVertex)
+    newEdge.setProperty(coderProperty)
+    newEdge.setProperty(keyExtractorProperty)
     builder.connectVertices(newEdge)
 
     new RDD[U](_sc, builder.buildWithoutSourceSinkCheck, mapVertex, Option.empty)
@@ -150,8 +155,9 @@ final class RDD[T: ClassTag] protected[rdd] (
     builder.addVertex(flatMapVertex, loopVertexStack)
 
     val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, flatMapVertex),
-      lastVertex, flatMapVertex, new SparkCoder[T](serializer))
-    newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor))
+      lastVertex, flatMapVertex)
+    newEdge.setProperty(coderProperty)
+    newEdge.setProperty(keyExtractorProperty)
     builder.connectVertices(newEdge)
 
     new RDD[U](_sc, builder.buildWithoutSourceSinkCheck, flatMapVertex, Option.empty)
@@ -179,8 +185,9 @@ final class RDD[T: ClassTag] protected[rdd] (
     builder.addVertex(reduceVertex, loopVertexStack)
 
     val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, reduceVertex),
-      lastVertex, reduceVertex, new SparkCoder[T](serializer))
-    newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor))
+      lastVertex, reduceVertex)
+    newEdge.setProperty(coderProperty)
+    newEdge.setProperty(keyExtractorProperty)
 
     builder.connectVertices(newEdge)
     ReduceTransform.reduceIterator(
@@ -202,8 +209,9 @@ final class RDD[T: ClassTag] protected[rdd] (
 
     builder.addVertex(flatMapVertex, loopVertexStack)
     val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, flatMapVertex),
-      lastVertex, flatMapVertex, new SparkCoder[T](serializer))
-    newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor))
+      lastVertex, flatMapVertex)
+    newEdge.setProperty(coderProperty)
+    newEdge.setProperty(keyExtractorProperty)
 
     builder.connectVertices(newEdge)
     JobLauncher.launchDAG(builder.build)
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/CompiletimeOptimizer.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/CompiletimeOptimizer.java
index cadb4435..0213242d 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/CompiletimeOptimizer.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/CompiletimeOptimizer.java
@@ -71,7 +71,7 @@ private CompiletimeOptimizer() {
       if ((passToApply instanceof AnnotatingPass && !checkAnnotatingPass(dag, processedDAG))
           || (passToApply instanceof ReshapingPass && !checkReshapingPass(dag, processedDAG))) {
         throw new CompileTimeOptimizationException(passToApply.getClass().getSimpleName()
-            + "is implemented in a way that doesn't follow its original intention of annotating or reshaping. "
+            + " is implemented in a way that doesn't follow its original intention of annotating or reshaping. "
             + "Modify it or use a general CompileTimePass");
       }
       // Save the processed JSON DAG.
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/MapReduceDisaggregationOptimization.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/MapReduceDisaggregationOptimization.java
index 6695ec18..8225df22 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/MapReduceDisaggregationOptimization.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/MapReduceDisaggregationOptimization.java
@@ -15,7 +15,6 @@
  */
 package edu.snu.nemo.compiler.optimizer.examples;
 
-import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
@@ -58,12 +57,10 @@ public static void main(final String[] args) throws Exception {
     builder.addVertex(map);
     builder.addVertex(reduce);
 
-    final IREdge edge1 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
-        source, map, Coder.DUMMY_CODER);
+    final IREdge edge1 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, map);
     builder.connectVertices(edge1);
 
-    final IREdge edge2 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle,
-        map, reduce, Coder.DUMMY_CODER);
+    final IREdge edge2 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, map, reduce);
     builder.connectVertices(edge2);
 
     final DAG<IRVertex, IREdge> dag = builder.build();
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPass.java
new file mode 100644
index 00000000..d4efdda1
--- /dev/null
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPass.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright (C) 2017 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
+import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+
+import java.util.Collections;
+
+/**
+ * Pass for initiating IREdge Coder ExecutionProperty with default dummy coder.
+ */
+public final class DefaultEdgeCoderPass extends AnnotatingPass {
+
+  private static final CoderProperty DEFAULT_CODER_PROPERTY = CoderProperty.of(Coder.DUMMY_CODER);
+
+  /**
+   * Default constructor.
+   */
+  public DefaultEdgeCoderPass() {
+    super(ExecutionProperty.Key.Coder, Collections.emptySet());
+  }
+
+  @Override
+  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+    dag.topologicalDo(irVertex ->
+        dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
+          if (irEdge.getProperty(ExecutionProperty.Key.Coder) == null) {
+            irEdge.setProperty(DEFAULT_CODER_PROPERTY);
+          }
+        }));
+    return dag;
+  }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
index d0de39d0..5a5949c2 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
@@ -31,6 +31,7 @@
   public PrimitiveCompositePass() {
     super(Arrays.asList(
         new DefaultParallelismPass(), // annotating after reshaping passes, before stage partitioning
+        new DefaultEdgeCoderPass(),
         new DefaultStagePartitioningPass(),
         new ReviseInterStageEdgeDataStorePass(), // after stage partitioning
         new DefaultEdgeUsedDataHandlingPass(),
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java
index 18757ae9..117e1afd 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java
@@ -16,6 +16,7 @@
 package edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping;
 
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
@@ -147,7 +148,8 @@ private static void mergeAndAddToBuilder(final List<OperatorVertex> ovs, final D
             outEdges.getOrDefault(ov, new HashSet<>()).forEach(e -> {
               outListToModify.remove(e);
               final IREdge newIrEdge = new IREdge(e.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-                  operatorVertexToUse, e.getDst(), e.getCoder());
+                  operatorVertexToUse, e.getDst());
+              newIrEdge.setProperty(CoderProperty.of(e.getProperty(ExecutionProperty.Key.Coder)));
               outListToModify.add(newIrEdge);
             });
             outEdges.remove(ov);
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java
index 32f8b5dc..88e16d6e 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java
@@ -18,6 +18,7 @@
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
@@ -65,10 +66,11 @@ public DataSkewReshapingPass() {
                 .equals(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern))) {
             // We then insert the dynamicOptimizationVertex between the vertex and incoming vertices.
             final IREdge newEdge = new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
-                edge.getSrc(), metricCollectionBarrierVertex, edge.getCoder());
+                edge.getSrc(), metricCollectionBarrierVertex);
+            newEdge.setProperty(CoderProperty.of(edge.getProperty(ExecutionProperty.Key.Coder)));
 
             final IREdge edgeToGbK = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-                metricCollectionBarrierVertex, v, edge.getCoder(), edge.isSideInput());
+                metricCollectionBarrierVertex, v, edge.isSideInput());
             edge.copyExecutionPropertiesTo(edgeToGbK);
             builder.connectVertices(newEdge);
             builder.connectVertices(edgeToGbK);
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java
index 02ede778..b913c77c 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java
@@ -61,7 +61,7 @@ private Integer findMaxLoopVertexStackDepth(final DAG<IRVertex, IREdge> dag) {
 
   /**
    * This part groups each iteration of loops together by observing the LoopVertex assigned to primitive operators,
-   * which is assigned by the {@link edu.snu.nemo.client.beam.NemoPipelineVisitor}. This also shows in which depth of
+   * which is assigned by the NemoPipelineVisitor. This also shows in which depth of
    * nested loops the function handles. It recursively calls itself from the maximum depth until 0.
    * @param dag DAG to process
    * @param depth the depth of the stack to process. Must be greater than 0.
@@ -100,7 +100,7 @@ private Integer findMaxLoopVertexStackDepth(final DAG<IRVertex, IREdge> dag) {
                 srcLoopVertex.addDagOutgoingEdge(irEdge);
                 final IREdge edgeFromLoop =
                     new IREdge(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-                        srcLoopVertex, operatorVertex, irEdge.getCoder(), irEdge.isSideInput());
+                        srcLoopVertex, operatorVertex, irEdge.isSideInput());
                 irEdge.copyExecutionPropertiesTo(edgeFromLoop);
                 builder.connectVertices(edgeFromLoop);
                 srcLoopVertex.mapEdgeWithLoop(edgeFromLoop, irEdge);
@@ -148,7 +148,7 @@ private static void connectElementToLoop(final DAG<IRVertex, IREdge> dag, final
         } else { // loop -> loop connection
           assignedLoopVertex.addDagIncomingEdge(irEdge);
           final IREdge edgeToLoop = new IREdge(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-              srcLoopVertex, assignedLoopVertex, irEdge.getCoder(), irEdge.isSideInput());
+              srcLoopVertex, assignedLoopVertex, irEdge.isSideInput());
           irEdge.copyExecutionPropertiesTo(edgeToLoop);
           builder.connectVertices(edgeToLoop);
           assignedLoopVertex.mapEdgeWithLoop(edgeToLoop, irEdge);
@@ -156,7 +156,7 @@ private static void connectElementToLoop(final DAG<IRVertex, IREdge> dag, final
       } else { // operator -> loop
         assignedLoopVertex.addDagIncomingEdge(irEdge);
         final IREdge edgeToLoop = new IREdge(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-            irEdge.getSrc(), assignedLoopVertex, irEdge.getCoder(), irEdge.isSideInput());
+            irEdge.getSrc(), assignedLoopVertex, irEdge.isSideInput());
         irEdge.copyExecutionPropertiesTo(edgeToLoop);
         builder.connectVertices(edgeToLoop);
         assignedLoopVertex.mapEdgeWithLoop(edgeToLoop, irEdge);
@@ -227,13 +227,13 @@ private static void connectElementToLoop(final DAG<IRVertex, IREdge> dag, final
 
               // add the new IREdge to the iterative incoming edges list.
               final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-                  equivalentSrcVertex, equivalentDstVertex, edge.getCoder(), edge.isSideInput());
+                  equivalentSrcVertex, equivalentDstVertex, edge.isSideInput());
               edge.copyExecutionPropertiesTo(newIrEdge);
               finalRootLoopVertex.addIterativeIncomingEdge(newIrEdge);
             } else {
               // src is from outside the previous loop. vertex outside previous loop -> DAG.
               final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-                  srcVertex, equivalentDstVertex, edge.getCoder(), edge.isSideInput());
+                  srcVertex, equivalentDstVertex, edge.isSideInput());
               edge.copyExecutionPropertiesTo(newIrEdge);
               finalRootLoopVertex.addNonIterativeIncomingEdge(newIrEdge);
             }
@@ -246,7 +246,7 @@ private static void connectElementToLoop(final DAG<IRVertex, IREdge> dag, final
             final IRVertex equivalentSrcVertex = equivalentVertices.get(srcVertex);
 
             final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-                equivalentSrcVertex, dstVertex, edge.getCoder(), edge.isSideInput());
+                equivalentSrcVertex, dstVertex, edge.isSideInput());
             edge.copyExecutionPropertiesTo(newIrEdge);
             finalRootLoopVertex.addDagOutgoingEdge(newIrEdge);
             finalRootLoopVertex.mapEdgeWithLoop(loopVertex.getEdgeWithLoop(edge), newIrEdge);
@@ -291,7 +291,7 @@ private static void addVertexToBuilder(final DAGBuilder<IRVertex, IREdge> builde
         builder.connectVertices(edge);
       } else {
         final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-            firstEquivalentVertex, irVertex, edge.getCoder(), edge.isSideInput());
+            firstEquivalentVertex, irVertex, edge.isSideInput());
         edge.copyExecutionPropertiesTo(newIrEdge);
         builder.connectVertices(newIrEdge);
       }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
index d2aaeee5..1e5f23f5 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
@@ -16,6 +16,7 @@
 package edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping;
 
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.LoopVertex;
 import edu.snu.nemo.common.dag.DAG;
@@ -159,7 +160,7 @@ public LoopFusionPass() {
             inEdges.getOrDefault(loopVertex, new ArrayList<>()).forEach(irEdge -> {
               if (builder.contains(irEdge.getSrc())) {
                 final IREdge newIREdge = new IREdge(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-                    irEdge.getSrc(), newLoopVertex, irEdge.getCoder(), irEdge.isSideInput());
+                    irEdge.getSrc(), newLoopVertex, irEdge.isSideInput());
                 irEdge.copyExecutionPropertiesTo(newIREdge);
                 builder.connectVertices(newIREdge);
               }
@@ -168,7 +169,7 @@ public LoopFusionPass() {
             outEdges.getOrDefault(loopVertex, new ArrayList<>()).forEach(irEdge -> {
               if (builder.contains(irEdge.getDst())) {
                 final IREdge newIREdge = new IREdge(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-                    newLoopVertex, irEdge.getDst(), irEdge.getCoder(), irEdge.isSideInput());
+                    newLoopVertex, irEdge.getDst(), irEdge.isSideInput());
                 irEdge.copyExecutionPropertiesTo(newIREdge);
                 builder.connectVertices(newIREdge);
               }
@@ -285,8 +286,10 @@ public LoopInvariantCodeMotionPass() {
               candidate.getValue().stream().map(IREdge::getSrc).anyMatch(edgeSrc -> edgeSrc.equals(e.getSrc())))
               .forEach(edge -> {
                 edgesToRemove.add(edge);
-                edgesToAdd.add(new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-                    candidate.getKey(), edge.getDst(), edge.getCoder(), edge.isSideInput()));
+                final IREdge newEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
+                    candidate.getKey(), edge.getDst(), edge.isSideInput());
+                newEdge.setProperty(CoderProperty.of(edge.getProperty(ExecutionProperty.Key.Coder)));
+                edgesToAdd.add(newEdge);
               });
           final List<IREdge> listToModify = inEdges.getOrDefault(loopVertex, new ArrayList<>());
           listToModify.removeAll(edgesToRemove);
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
index 77700651..25f941c7 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
@@ -17,6 +17,7 @@
 
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
@@ -59,10 +60,11 @@ public SailfishRelayReshapingPass() {
             final OperatorVertex iFileMergerVertex = new OperatorVertex(new RelayTransform());
             builder.addVertex(iFileMergerVertex);
             final IREdge newEdgeToMerger = new IREdge(DataCommunicationPatternProperty.Value.Shuffle,
-                edge.getSrc(), iFileMergerVertex, edge.getCoder(), edge.isSideInput());
-            final IREdge newEdgeFromMerger = new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
-                iFileMergerVertex, v, edge.getCoder());
+                edge.getSrc(), iFileMergerVertex, edge.isSideInput());
             edge.copyExecutionPropertiesTo(newEdgeToMerger);
+            final IREdge newEdgeFromMerger = new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
+                iFileMergerVertex, v);
+            newEdgeFromMerger.setProperty(CoderProperty.of(edge.getProperty(ExecutionProperty.Key.Coder)));
             builder.connectVertices(newEdgeToMerger);
             builder.connectVertices(newEdgeFromMerger);
           } else {
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
index adaf7528..3e791dcb 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
@@ -186,8 +186,7 @@ private void handleDuplicateEdgeGroupProperty(final DAG<Stage, StageEdge> dagOfS
                 irEdge.getId(),
                 irEdge.getExecutionProperties(),
                 irEdge.getSrc(),
-                irEdge.getDst(),
-                irEdge.getCoder()));
+                irEdge.getDst()));
           } else { // edge comes from another stage
             final Stage srcStage = vertexStageMap.get(srcVertex);
 
@@ -201,7 +200,6 @@ private void handleDuplicateEdgeGroupProperty(final DAG<Stage, StageEdge> dagOfS
                 .setSrcVertex(srcVertex)
                 .setDstVertex(dstVertex)
                 .setSrcStage(srcStage)
-                .setCoder(irEdge.getCoder())
                 .setSideInputFlag(irEdge.isSideInput());
             currentStageIncomingEdges.add(newEdgeBuilder);
           }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java
index c86d1e7b..528beb80 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java
@@ -15,7 +15,6 @@
  */
 package edu.snu.nemo.runtime.common.plan;
 
-import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.dag.Edge;
 import edu.snu.nemo.common.dag.Vertex;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
@@ -26,69 +25,60 @@
  * @param <V> the vertex type.
  */
 public class RuntimeEdge<V extends Vertex> extends Edge<V> {
-  private final ExecutionPropertyMap edgeProperties;
-  private final Coder coder;
+  private final ExecutionPropertyMap executionProperties;
   private final Boolean isSideInput;
 
   /**
    * Constructs the edge given the below parameters.
-   * @param runtimeEdgeId the id of this edge.
-   * @param edgeProperties to control the data flow on this edge.
-   * @param src the source vertex.
-   * @param dst the destination vertex.
-   * @param coder coder.
+   * This constructor assumes that this edge is not for a side input.
+   *
+   * @param runtimeEdgeId       the id of this edge.
+   * @param executionProperties to control the data flow on this edge.
+   * @param src                 the source vertex.
+   * @param dst                 the destination vertex.
    */
   public RuntimeEdge(final String runtimeEdgeId,
-                     final ExecutionPropertyMap edgeProperties,
+                     final ExecutionPropertyMap executionProperties,
                      final V src,
-                     final V dst,
-                     final Coder coder) {
-    this(runtimeEdgeId, edgeProperties, src, dst, coder, false);
+                     final V dst) {
+    this(runtimeEdgeId, executionProperties, src, dst, false);
   }
 
   /**
    * Constructs the edge given the below parameters.
-   * @param runtimeEdgeId the id of this edge.
-   * @param edgeProperties to control the data flow on this edge.
-   * @param src the source vertex.
-   * @param dst the destination vertex.
-   * @param coder coder.
-   * @param isSideInput Whether or not the RuntimeEdge is a side input edge.
+   *
+   * @param runtimeEdgeId  the id of this edge.
+   * @param executionProperties to control the data flow on this edge.
+   * @param src            the source vertex.
+   * @param dst            the destination vertex.
+   * @param isSideInput    Whether or not the RuntimeEdge is a side input edge.
    */
   public RuntimeEdge(final String runtimeEdgeId,
-                     final ExecutionPropertyMap edgeProperties,
+                     final ExecutionPropertyMap executionProperties,
                      final V src,
                      final V dst,
-                     final Coder coder,
                      final Boolean isSideInput) {
     super(runtimeEdgeId, src, dst);
-    this.edgeProperties = edgeProperties;
-    this.coder = coder;
+    this.executionProperties = executionProperties;
     this.isSideInput = isSideInput;
   }
 
   /**
    * Get the execution property of the Runtime Edge.
-   * @param <T> Type of the return value.
+   *
+   * @param <T>                  Type of the return value.
    * @param executionPropertyKey key of the execution property.
    * @return the execution property.
    */
   public final <T> T getProperty(final ExecutionProperty.Key executionPropertyKey) {
-    return edgeProperties.get(executionPropertyKey);
+    return executionProperties.get(executionPropertyKey);
   }
 
   /**
    * @return the ExecutionPropertyMap of the Runtime Edge.
    */
   public final ExecutionPropertyMap getExecutionProperties() {
-    return edgeProperties;
-  }
-
-  /**
-   * @return the coder for encoding and decoding.
-   */
-  public final Coder getCoder() {
-    return coder;
+    return executionProperties;
   }
 
   /**
@@ -106,9 +96,8 @@ public final Boolean isSideInput() {
   public String propertiesToJSON() {
     final StringBuilder sb = new StringBuilder();
     sb.append("{\"runtimeEdgeId\": \"").append(getId());
-    sb.append("\", \"edgeProperties\": ").append(edgeProperties);
-    sb.append(", \"coder\": \"").append(coder.toString());
-    sb.append("\"}");
+    sb.append("\", \"executionProperties\": ").append(executionProperties);
+    sb.append("}");
     return sb.toString();
   }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
index 3b8cdf01..ec73077b 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
@@ -15,7 +15,6 @@
  */
 package edu.snu.nemo.runtime.common.plan;
 
-import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.runtime.common.data.KeyRange;
@@ -48,24 +47,23 @@
 
   /**
    * Constructor.
-   * @param runtimeEdgeId id of the runtime edge.
+   *
+   * @param runtimeEdgeId  id of the runtime edge.
    * @param edgeProperties edge execution properties.
-   * @param srcVertex source IRVertex in the srcStage of this edge.
-   * @param dstVertex destination IRVertex in the dstStage of this edge.
-   * @param srcStage source stage.
-   * @param dstStage destination stage.
-   * @param coder the coder for enconding and deconding.
-   * @param isSideInput whether or not the edge is a sideInput edge.
+   * @param srcVertex      source IRVertex in the srcStage of this edge.
+   * @param dstVertex      destination IRVertex in the dstStage of this edge.
+   * @param srcStage       source stage.
+   * @param dstStage       destination stage.
+   * @param isSideInput    whether or not the edge is a sideInput edge.
    */
-  public StageEdge(final String runtimeEdgeId,
-                   final ExecutionPropertyMap edgeProperties,
-                   final IRVertex srcVertex,
-                   final IRVertex dstVertex,
-                   final Stage srcStage,
-                   final Stage dstStage,
-                   final Coder coder,
-                   final Boolean isSideInput) {
-    super(runtimeEdgeId, edgeProperties, srcStage, dstStage, coder, isSideInput);
+  StageEdge(final String runtimeEdgeId,
+            final ExecutionPropertyMap edgeProperties,
+            final IRVertex srcVertex,
+            final IRVertex dstVertex,
+            final Stage srcStage,
+            final Stage dstStage,
+            final Boolean isSideInput) {
+    super(runtimeEdgeId, edgeProperties, srcStage, dstStage, isSideInput);
     this.srcVertex = srcVertex;
     this.dstVertex = dstVertex;
     // Initialize the key range of each dst task.
@@ -96,7 +94,6 @@ public String propertiesToJSON() {
     sb.append("\", \"edgeProperties\": ").append(getExecutionProperties());
     sb.append(", \"externalSrcVertexId\": \"").append(srcVertex.getId());
     sb.append("\", \"externalDstVertexId\": \"").append(dstVertex.getId());
-    sb.append("\", \"coder\": \"").append(getCoder().toString());
     sb.append("\"}");
     return sb.toString();
   }
@@ -110,6 +107,7 @@ public String propertiesToJSON() {
 
   /**
    * Sets the task idx to key range list.
+   *
    * @param taskIdxToKeyRange the list to set.
    */
   public void setTaskIdxToKeyRange(final List<KeyRange> taskIdxToKeyRange) {
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java
index b7448a1c..5630b71f 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java
@@ -15,8 +15,6 @@
  */
 package edu.snu.nemo.runtime.common.plan;
 
-
-import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 
@@ -30,19 +28,20 @@
   private Stage dstStage;
   private IRVertex srcVertex;
   private IRVertex dstVertex;
-  private Coder coder;
   private Boolean isSideInput;
 
   /**
    * Represents the edge between vertices in a logical plan.
+   *
    * @param irEdgeId id of this edge.
    */
-  StageEdgeBuilder(final String irEdgeId) {
+  public StageEdgeBuilder(final String irEdgeId) {
     this.stageEdgeId = irEdgeId;
   }
 
   /**
    * Setter for edge properties.
+   *
    * @param ea the edge properties.
    * @return the updated StageEdgeBuilder.
    */
@@ -53,6 +52,7 @@ public StageEdgeBuilder setEdgeProperties(final ExecutionPropertyMap ea) {
 
   /**
    * Setter for the source stage.
+   *
    * @param ss the source stage.
    * @return the updated StageEdgeBuilder.
    */
@@ -63,6 +63,7 @@ public StageEdgeBuilder setSrcStage(final Stage ss) {
 
   /**
    * Setter for the destination stage.
+   *
    * @param ds the destination stage.
    * @return the updated StageEdgeBuilder.
    */
@@ -73,6 +74,7 @@ public StageEdgeBuilder setDstStage(final Stage ds) {
 
   /**
    * Setter for the source vertex.
+   *
    * @param sv the source vertex.
    * @return the updated StageEdgeBuilder.
    */
@@ -83,6 +85,7 @@ public StageEdgeBuilder setSrcVertex(final IRVertex sv) {
 
   /**
    * Setter for the destination vertex.
+   *
    * @param dv the destination vertex.
    * @return the updated StageEdgeBuilder.
    */
@@ -91,18 +94,9 @@ public StageEdgeBuilder setDstVertex(final IRVertex dv) {
     return this;
   }
 
-  /**
-   * Setter for coder.
-   * @param c the coder.
-   * @return the updated StageEdgeBuilder.
-   */
-  public StageEdgeBuilder setCoder(final Coder c) {
-    this.coder = c;
-    return this;
-  }
-
   /**
    * Setter for side input flag.
+   *
    * @param sideInputFlag the side input flag.
    * @return the updated StageEdgeBuilder.
    */
@@ -111,7 +105,10 @@ public StageEdgeBuilder setSideInputFlag(final Boolean sideInputFlag) {
     return this;
   }
 
+  /**
+   * @return the built stage edge.
+   */
   public StageEdge build() {
-    return new StageEdge(stageEdgeId, edgeProperties, srcVertex, dstVertex, srcStage, dstStage, coder, isSideInput);
+    return new StageEdge(stageEdgeId, edgeProperties, srcVertex, dstVertex, srcStage, dstStage, isSideInput);
   }
 }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
index 8b2925d5..155a9b07 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
@@ -17,6 +17,7 @@
 
 import com.google.protobuf.ByteString;
 import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.common.exception.IllegalMessageException;
@@ -105,13 +106,13 @@ private void launchTask(final Task task) {
       final TaskStateManager taskStateManager =
           new TaskStateManager(task, executorId, persistentConnectionToMasterMap, metricMessageSender);
 
-      task.getTaskIncomingEdges()
-          .forEach(e -> serializerManager.register(e.getId(), e.getCoder(), e.getExecutionProperties()));
-      task.getTaskOutgoingEdges()
-          .forEach(e -> serializerManager.register(e.getId(), e.getCoder(), e.getExecutionProperties()));
+      task.getTaskIncomingEdges().forEach(e -> serializerManager.register(e.getId(),
+          e.getProperty(ExecutionProperty.Key.Coder), e.getExecutionProperties()));
+      task.getTaskOutgoingEdges().forEach(e -> serializerManager.register(e.getId(),
+          e.getProperty(ExecutionProperty.Key.Coder), e.getExecutionProperties()));
       irDag.getVertices().forEach(v -> {
-        irDag.getOutgoingEdgesOf(v)
-            .forEach(e -> serializerManager.register(e.getId(), e.getCoder(), e.getExecutionProperties()));
+        irDag.getOutgoingEdgesOf(v).forEach(e -> serializerManager.register(e.getId(),
+            e.getProperty(ExecutionProperty.Key.Coder), e.getExecutionProperties()));
       });
 
       new TaskExecutor(
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
index f2c0082e..2c4a2acb 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -260,10 +260,9 @@ public void testTwoOperatorsWithSideInput() throws Exception {
                                            final IRVertex dst,
                                            final boolean isSideInput) {
     final String runtimeIREdgeId = "Runtime edge between operator tasks";
-    final Coder coder = Coder.DUMMY_CODER;
     ExecutionPropertyMap edgeProperties = new ExecutionPropertyMap(runtimeIREdgeId);
     edgeProperties.put(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
-    return new RuntimeEdge<>(runtimeIREdgeId, edgeProperties, src, dst, coder, isSideInput);
+    return new RuntimeEdge<>(runtimeIREdgeId, edgeProperties, src, dst, isSideInput);
 
   }
 
diff --git a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
index 9d4142d7..4888a1fc 100644
--- a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
+++ b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
@@ -15,7 +15,6 @@
  */
 package edu.snu.nemo.runtime.plangenerator;
 
-import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
@@ -136,16 +135,16 @@ private static PhysicalPlan convertIRToPhysical(final DAG<IRVertex, IREdge> irDA
     v5.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
     dagBuilder.addVertex(v5);
 
-    final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, Coder.DUMMY_CODER);
+    final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2);
     dagBuilder.connectVertices(e1);
 
-    final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v2, Coder.DUMMY_CODER);
+    final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v2);
     dagBuilder.connectVertices(e2);
 
-    final IREdge e3 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4, Coder.DUMMY_CODER);
+    final IREdge e3 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4);
     dagBuilder.connectVertices(e3);
 
-    final IREdge e4 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v5, Coder.DUMMY_CODER);
+    final IREdge e4 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v5);
     dagBuilder.connectVertices(e4);
 
     return dagBuilder.buildWithoutSourceSinkCheck();
@@ -182,10 +181,10 @@ private static PhysicalPlan convertIRToPhysical(final DAG<IRVertex, IREdge> irDA
     }
     dagBuilder.addVertex(v3);
 
-    final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, Coder.DUMMY_CODER);
+    final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2);
     dagBuilder.connectVertices(e1);
 
-    final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v2, v3, Coder.DUMMY_CODER);
+    final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v2, v3);
     dagBuilder.connectVertices(e2);
 
     return dagBuilder.buildWithoutSourceSinkCheck();
diff --git a/tests/src/test/java/edu/snu/nemo/tests/common/ir/LoopVertexTest.java b/tests/src/test/java/edu/snu/nemo/tests/common/ir/LoopVertexTest.java
index 03684992..919530dc 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/common/ir/LoopVertexTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/common/ir/LoopVertexTest.java
@@ -54,26 +54,17 @@ public void setUp() {
     final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
 
     loopDAGBuilder.addVertex(map1).addVertex(groupByKey).addVertex(combine).addVertex(map2)
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle,
-            map1, groupByKey, Coder.DUMMY_CODER))
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
-            groupByKey, combine, Coder.DUMMY_CODER))
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
-            combine, map2, Coder.DUMMY_CODER));
-    loopVertex.addDagIncomingEdge(new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
-        source, map1, Coder.DUMMY_CODER));
-    loopVertex.addIterativeIncomingEdge(new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
-        map2, map1, Coder.DUMMY_CODER));
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle, map1, groupByKey))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, groupByKey, combine))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, combine, map2));
+    loopVertex.addDagIncomingEdge(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, map1));
+    loopVertex.addIterativeIncomingEdge(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, map2, map1));
 
     originalDAG = builder.addVertex(source).addVertex(map1).addVertex(groupByKey).addVertex(combine).addVertex(map2)
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
-            source, map1, Coder.DUMMY_CODER))
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle,
-            map1, groupByKey, Coder.DUMMY_CODER))
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
-            groupByKey, combine, Coder.DUMMY_CODER))
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
-            combine, map2, Coder.DUMMY_CODER))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, map1))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle, map1, groupByKey))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, groupByKey, combine))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, combine, map2))
         .build();
   }
 
diff --git a/tests/src/test/java/edu/snu/nemo/tests/common/ir/executionproperty/ExecutionPropertyMapTest.java b/tests/src/test/java/edu/snu/nemo/tests/common/ir/executionproperty/ExecutionPropertyMapTest.java
index 8f0c153c..f6f089f2 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/common/ir/executionproperty/ExecutionPropertyMapTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/common/ir/executionproperty/ExecutionPropertyMapTest.java
@@ -17,6 +17,7 @@
 
 import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
@@ -39,8 +40,7 @@
   private final IRVertex source = new EmptyComponents.EmptySourceVertex<>("Source");
   private final IRVertex destination = new OperatorVertex(new EmptyComponents.EmptyTransform("MapElements"));
   private final DataCommunicationPatternProperty.Value comPattern = DataCommunicationPatternProperty.Value.OneToOne;
-  private final IREdge edge = new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
-      source, destination, Coder.DUMMY_CODER);
+  private final IREdge edge = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, destination);
 
   private ExecutionPropertyMap edgeMap;
   private ExecutionPropertyMap vertexMap;
@@ -65,6 +65,8 @@ public void testPutGetAndRemove() {
     assertEquals(DataStoreProperty.Value.MemoryStore, edgeMap.get(ExecutionProperty.Key.DataStore));
     edgeMap.put(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
     assertEquals(DataFlowModelProperty.Value.Pull, edgeMap.get(ExecutionProperty.Key.DataFlowModel));
+    edgeMap.put(CoderProperty.of(Coder.DUMMY_CODER));
+    assertEquals(Coder.DUMMY_CODER, edgeMap.get(ExecutionProperty.Key.Coder));
 
     edgeMap.remove(ExecutionProperty.Key.DataFlowModel);
     assertNull(edgeMap.get(ExecutionProperty.Key.DataFlowModel));
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
index ebbc7b6c..cb4b70c9 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
@@ -20,7 +20,6 @@
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
-import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.compiler.backend.nemo.NemoBackend;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
@@ -54,12 +53,10 @@
   @Before
   public void setUp() throws Exception {
     this.dag = builder.addVertex(source).addVertex(map1).addVertex(groupByKey).addVertex(combine).addVertex(map2)
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, map1, Coder.DUMMY_CODER))
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle,
-            map1, groupByKey, Coder.DUMMY_CODER))
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
-            groupByKey, combine, Coder.DUMMY_CODER))
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, combine, map2, Coder.DUMMY_CODER))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, map1))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle, map1, groupByKey))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, groupByKey, combine))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, combine, map2))
         .build();
 
     this.dag = CompiletimeOptimizer.optimize(dag, new PadoPolicy(), EMPTY_DAG_DIRECTORY);
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPassTest.java
new file mode 100644
index 00000000..6eec0266
--- /dev/null
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPassTest.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright (C) 2017 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.tests.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.client.JobLauncher;
+import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.AnnotatingPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultEdgeCoderPass;
+import edu.snu.nemo.tests.compiler.CompilerTestUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test {@link edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultEdgeCoderPass}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JobLauncher.class)
+public class DefaultEdgeCoderPassTest {
+  private DAG<IRVertex, IREdge> compiledDAG;
+
+  @Before
+  public void setUp() throws Exception {
+    compiledDAG = CompilerTestUtil.compileMRDAG();
+  }
+
+  @Test
+  public void testAnnotatingPass() {
+    final AnnotatingPass coderPass = new DefaultEdgeCoderPass();
+    assertEquals(ExecutionProperty.Key.Coder, coderPass.getExecutionPropertyToModify());
+  }
+
+  @Test
+  public void testNotOverride() {
+    // Get the first coder from the compiled DAG
+    final Coder compiledCoder = compiledDAG
+        .getOutgoingEdgesOf(compiledDAG.getTopologicalSort().get(0)).get(0).getProperty(ExecutionProperty.Key.Coder);
+    final DAG<IRVertex, IREdge> processedDAG = new DefaultEdgeCoderPass().apply(compiledDAG);
+
+    // Get the first coder from the processed DAG
+    final Coder processedCoder = processedDAG
+        .getOutgoingEdgesOf(processedDAG.getTopologicalSort().get(0)).get(0).getProperty(ExecutionProperty.Key.Coder);
+    assertEquals(compiledCoder, processedCoder); // It must not be changed.
+  }
+
+  @Test
+  public void testSetToDefault() throws Exception {
+    // Remove the first coder from the compiled DAG (to let our pass to set as default coder).
+    compiledDAG.getOutgoingEdgesOf(compiledDAG.getTopologicalSort().get(0))
+        .get(0).getExecutionProperties().remove(ExecutionProperty.Key.Coder);
+    final DAG<IRVertex, IREdge> processedDAG = new DefaultEdgeCoderPass().apply(compiledDAG);
+
+    // Check whether the pass set the empty coder to our default coder.
+    final Coder processedCoder = processedDAG
+        .getOutgoingEdgesOf(processedDAG.getTopologicalSort().get(0)).get(0).getProperty(ExecutionProperty.Key.Coder);
+    assertEquals(Coder.DUMMY_CODER, processedCoder);
+  }
+}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPassTest.java
index 2be50db3..6c8a52a0 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPassTest.java
@@ -38,7 +38,7 @@
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(JobLauncher.class)
 public class DefaultParallelismPassTest {
-  DAG<IRVertex, IREdge> compiledDAG;
+  private DAG<IRVertex, IREdge> compiledDAG;
 
   @Before
   public void setUp() throws Exception {
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPassTest.java
index d6f63538..28fc29f0 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPassTest.java
@@ -16,7 +16,6 @@
 package edu.snu.nemo.tests.compiler.optimizer.pass.compiletime.reshaping;
 
 import edu.snu.nemo.client.JobLauncher;
-import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
@@ -58,16 +57,16 @@ public void setUp() {
     final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
     dagNotToOptimize = dagBuilder.addVertex(source).addVertex(map1).addVertex(groupByKey).addVertex(combine)
         .addVertex(map2)
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, map1, Coder.DUMMY_CODER))
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle, map1, groupByKey, Coder.DUMMY_CODER))
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, groupByKey, combine, Coder.DUMMY_CODER))
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, combine, map2, Coder.DUMMY_CODER))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, map1))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle, map1, groupByKey))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, groupByKey, combine))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, combine, map2))
         .build();
     dagToOptimize = dagBuilder.addVertex(map1clone).addVertex(groupByKey2).addVertex(combine2).addVertex(map22)
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, map1clone, Coder.DUMMY_CODER))
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle, map1clone, groupByKey2, Coder.DUMMY_CODER))
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, groupByKey2, combine2, Coder.DUMMY_CODER))
-        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, combine2, map22, Coder.DUMMY_CODER))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, map1clone))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.Shuffle, map1clone, groupByKey2))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, groupByKey2, combine2))
+        .connectVertices(new IREdge(DataCommunicationPatternProperty.Value.OneToOne, combine2, map22))
         .build();
   }
 
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java
index e670c4db..159feba4 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java
@@ -19,6 +19,7 @@
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.LoopVertex;
@@ -101,7 +102,8 @@ public void setUp() throws Exception {
    * This method adds a LoopVertex at the end of the DAG (no more outgoing edges), after the
    * {@param vertexToBeFollowed}. We assume, as in the MLR, ALS DAG, that iterative incoming edges work to receive
    * main inputs, and non-iterative incoming edges work to receive side inputs.
-   * @param builder builder to add the LoopVertex to.
+   *
+   * @param builder            builder to add the LoopVertex to.
    * @param vertexToBeFollowed vertex that is to be followed by the LoopVertex.
    * @param loopVertexToFollow the new LoopVertex that will be added.
    */
@@ -111,12 +113,14 @@ private static void addLoopVertexToBuilder(final DAGBuilder<IRVertex, IREdge> bu
     builder.addVertex(loopVertexToFollow);
     loopVertexToFollow.getIterativeIncomingEdges().values().forEach(irEdges -> irEdges.forEach(irEdge -> {
       final IREdge newIREdge = new IREdge(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-          vertexToBeFollowed, loopVertexToFollow, irEdge.getCoder());
+          vertexToBeFollowed, loopVertexToFollow);
+      newIREdge.setProperty(CoderProperty.of(irEdge.getProperty(ExecutionProperty.Key.Coder)));
       builder.connectVertices(newIREdge);
     }));
     loopVertexToFollow.getNonIterativeIncomingEdges().values().forEach(irEdges -> irEdges.forEach(irEdge -> {
       final IREdge newIREdge = new IREdge(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-          irEdge.getSrc(), loopVertexToFollow, irEdge.getCoder());
+          irEdge.getSrc(), loopVertexToFollow);
+      newIREdge.setProperty(CoderProperty.of(irEdge.getProperty(ExecutionProperty.Key.Coder)));
       builder.connectVertices(newIREdge);
     }));
   }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
index 4df2fbdb..18feb790 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
@@ -19,6 +19,7 @@
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.LoopVertex;
@@ -87,11 +88,11 @@ public void setUp() throws Exception {
           if (!e.getSrc().equals(vertex7)) {
             builder.connectVertices(e);
           } else {
-            final Optional<IREdge> theIncomingEdge = newDAGIncomingEdge.stream().findFirst();
-            assertTrue(theIncomingEdge.isPresent());
-            final IREdge newIREdge =
-                new IREdge(theIncomingEdge.get().getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-                    theIncomingEdge.get().getSrc(), alsLoop, theIncomingEdge.get().getCoder());
+            final Optional<IREdge> incomingEdge = newDAGIncomingEdge.stream().findFirst();
+            assertTrue(incomingEdge.isPresent());
+            final IREdge newIREdge = new IREdge(incomingEdge.get().getProperty(
+                ExecutionProperty.Key.DataCommunicationPattern), incomingEdge.get().getSrc(), alsLoop);
+            newIREdge.setProperty(CoderProperty.of(incomingEdge.get().getProperty(ExecutionProperty.Key.Coder)));
             builder.connectVertices(newIREdge);
           }
         });
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
index 287e15ba..793c1827 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
@@ -29,21 +29,21 @@
   @Test
   public void testDisaggregationPolicy() {
     final Policy disaggregationPolicy = new DisaggregationPolicy();
-    assertEquals(12, disaggregationPolicy.getCompileTimePasses().size());
+    assertEquals(13, disaggregationPolicy.getCompileTimePasses().size());
     assertEquals(0, disaggregationPolicy.getRuntimePasses().size());
   }
 
   @Test
   public void testPadoPolicy() {
     final Policy padoPolicy = new PadoPolicy();
-    assertEquals(14, padoPolicy.getCompileTimePasses().size());
+    assertEquals(15, padoPolicy.getCompileTimePasses().size());
     assertEquals(0, padoPolicy.getRuntimePasses().size());
   }
 
   @Test
   public void testDataSkewPolicy() {
     final Policy dataSkewPolicy = new DataSkewPolicy();
-    assertEquals(16, dataSkewPolicy.getCompileTimePasses().size());
+    assertEquals(17, dataSkewPolicy.getCompileTimePasses().size());
     assertEquals(1, dataSkewPolicy.getRuntimePasses().size());
   }
 
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
index f948589f..677b2e97 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
@@ -72,7 +72,7 @@ public void testSimplePlan() throws Exception {
     v2.setProperty(ParallelismProperty.of(2));
     irDAGBuilder.addVertex(v2);
 
-    final IREdge e = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, Coder.DUMMY_CODER);
+    final IREdge e = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2);
     irDAGBuilder.connectVertices(e);
 
     final DAG<IRVertex, IREdge> irDAG = CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(),
@@ -155,27 +155,27 @@ public void testComplexPlan() throws Exception {
     // TODO #13: Implement Join Node
 //    irDAGBuilder.addVertex(v7);
 
-    final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v1, v2, Coder.DUMMY_CODER);
+    final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v1, v2);
     e1.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
     e1.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
 
-    final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v1, v3, Coder.DUMMY_CODER);
+    final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v1, v3);
     e2.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
     e2.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
 
-    final IREdge e3 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4, Coder.DUMMY_CODER);
+    final IREdge e3 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4);
     e3.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
     e3.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Push));
 
-    final IREdge e4 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v5, Coder.DUMMY_CODER);
+    final IREdge e4 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v5);
     e4.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
     e4.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Push));
 
-    final IREdge e5 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v6, Coder.DUMMY_CODER);
+    final IREdge e5 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v6);
     e5.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
     e5.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
 
-    final IREdge e6 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v8, Coder.DUMMY_CODER);
+    final IREdge e6 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v8);
     e6.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
     e6.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
 
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
index 697122e9..2a8a224e 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
@@ -39,6 +39,7 @@
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.plan.Stage;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
+import edu.snu.nemo.runtime.common.plan.StageEdgeBuilder;
 import edu.snu.nemo.runtime.executor.Executor;
 import edu.snu.nemo.runtime.executor.MetricManagerWorker;
 import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
@@ -308,22 +309,30 @@ private void writeAndRead(final BlockManagerWorker sender,
     final IRVertex dstVertex = verticesPair.right();
 
     // Edge setup
-    final IREdge dummyIREdge = new IREdge(commPattern, srcVertex, dstVertex, CODER);
+    final IREdge dummyIREdge = new IREdge(commPattern, srcVertex, dstVertex);
+    dummyIREdge.setProperty(CoderProperty.of(CODER));
     dummyIREdge.setProperty(KeyExtractorProperty.of((element -> element)));
     final ExecutionPropertyMap edgeProperties = dummyIREdge.getExecutionProperties();
     edgeProperties.put(DataCommunicationPatternProperty.of(commPattern));
     edgeProperties.put(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner));
-
     edgeProperties.put(DataStoreProperty.of(store));
     edgeProperties.put(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Keep));
+    edgeProperties.put(CoderProperty.of(CODER));
     final RuntimeEdge dummyEdge;
 
     final IRVertex srcMockVertex = mock(IRVertex.class);
     final IRVertex dstMockVertex = mock(IRVertex.class);
     final Stage srcStage = setupStages("srcStage-" + testIndex);
     final Stage dstStage = setupStages("dstStage-" + testIndex);
-    dummyEdge = new StageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex,
-        srcStage, dstStage, CODER, false);
+    dummyEdge = new StageEdgeBuilder(edgeId)
+        .setEdgeProperties(edgeProperties)
+        .setSrcVertex(srcMockVertex)
+        .setDstVertex(dstMockVertex)
+        .setSrcStage(srcStage)
+        .setDstStage(dstStage)
+        .setSideInputFlag(false)
+        .build();
+
     // Initialize states in Master
     srcStage.getTaskIds().forEach(srcTaskId -> {
       final String blockId = RuntimeIdGenerator.generateBlockId(
@@ -391,7 +400,8 @@ private void writeAndReadWithDuplicateData(final BlockManagerWorker sender,
     final IRVertex dstVertex = verticesPair.right();
 
     // Edge setup
-    final IREdge dummyIREdge = new IREdge(commPattern, srcVertex, dstVertex, CODER);
+    final IREdge dummyIREdge = new IREdge(commPattern, srcVertex, dstVertex);
+    dummyIREdge.setProperty(CoderProperty.of(CODER));
     dummyIREdge.setProperty(KeyExtractorProperty.of((element -> element)));
     final ExecutionPropertyMap edgeProperties = dummyIREdge.getExecutionProperties();
     edgeProperties.put(DataCommunicationPatternProperty.of(commPattern));
@@ -409,12 +419,24 @@ private void writeAndReadWithDuplicateData(final BlockManagerWorker sender,
     final IRVertex dstMockVertex = mock(IRVertex.class);
     final Stage srcStage = setupStages("srcStage-" + testIndex);
     final Stage dstStage = setupStages("dstStage-" + testIndex);
-    dummyEdge = new StageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex,
-        srcStage, dstStage, CODER, false);
+    dummyEdge = new StageEdgeBuilder(edgeId)
+        .setEdgeProperties(edgeProperties)
+        .setSrcVertex(srcMockVertex)
+        .setDstVertex(dstMockVertex)
+        .setSrcStage(srcStage)
+        .setDstStage(dstStage)
+        .setSideInputFlag(false)
+        .build();
     final IRVertex dstMockVertex2 = mock(IRVertex.class);
     final Stage dstStage2 = setupStages("dstStage-" + testIndex2);
-    dummyEdge2 = new StageEdge(edgeId2, edgeProperties, srcMockVertex, dstMockVertex2,
-        srcStage, dstStage2, CODER, false);
+    dummyEdge2 = new StageEdgeBuilder(edgeId2)
+        .setEdgeProperties(edgeProperties)
+        .setSrcVertex(srcMockVertex)
+        .setDstVertex(dstMockVertex2)
+        .setSrcStage(srcStage)
+        .setDstStage(dstStage2)
+        .setSideInputFlag(false)
+        .build();
     // Initialize states in Master
     srcStage.getTaskIds().forEach(srcTaskId -> {
       final String blockId = RuntimeIdGenerator.generateBlockId(


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services