You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/06/19 05:04:06 UTC

[GitHub] seojangho closed pull request #48: [NEMO-72] Instance-based Encoder/Decoder interface

seojangho closed pull request #48: [NEMO-72] Instance-based Encoder/Decoder interface
URL: https://github.com/apache/incubator-nemo/pull/48
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bin/json2dot.py b/bin/json2dot.py
index 20fa1774..6f2d3398 100755
--- a/bin/json2dot.py
+++ b/bin/json2dot.py
@@ -305,7 +305,8 @@ def __init__(self, src, dst, properties):
         self.dst = dst
         self.id = properties['id']
         self.executionProperties = properties['executionProperties']
-        self.coder = self.executionProperties['Coder']
+        self.encoderFactory = self.executionProperties['Encoder']
+        self.decoderFactory = self.executionProperties['Decoder']
     @property
     def dot(self):
         src = self.src
@@ -318,7 +319,7 @@ def dot(self):
             dst = dst.internalDstFor(self.id)
         except:
             pass
-        label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(self.id, edgePropertiesString(self.executionProperties), self.coder)
+        label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}<BR/>{}</FONT>'.format(self.id, edgePropertiesString(self.executionProperties), self.encoderFactory, self.decoderFactory)
         return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(src.oneVertex.idx,
                 dst.oneVertex.idx, src.logicalEnd, dst.logicalEnd, label)
 
@@ -328,10 +329,11 @@ def __init__(self, src, dst, properties):
         self.dst = dst.internalDAG.vertices[properties['dstVertex']]
         self.runtimeEdgeId = properties['runtimeEdgeId']
         self.executionProperties = properties['executionProperties']
-        self.coder = self.executionProperties['Coder']
+        self.encoderFactory = self.executionProperties['Encoder']
+        self.decoderFactory = self.executionProperties['Decoder']
     @property
     def dot(self):
-        label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(self.runtimeEdgeId, edgePropertiesString(self.executionProperties), self.coder)
+        label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}<BR/>{}</FONT>'.format(self.runtimeEdgeId, edgePropertiesString(self.executionProperties), self.encoderFactory, self.decoderFactory)
         return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(self.src.oneVertex.idx,
                 self.dst.oneVertex.idx, self.src.logicalEnd, self.dst.logicalEnd, label)
 
@@ -341,10 +343,11 @@ def __init__(self, src, dst, properties):
         self.dst = dst
         self.runtimeEdgeId = properties['runtimeEdgeId']
         self.executionProperties = properties['executionProperties']
-        self.coder = self.executionProperties['Coder']
+        self.encoderFactory = self.executionProperties['Encoder']
+        self.decoderFactory = self.executionProperties['Decoder']
     @property
     def dot(self):
-        label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}</FONT>'.format(self.runtimeEdgeId, edgePropertiesString(self.executionProperties), self.coder)
+        label = '{}<BR/>{}<BR/><FONT POINT-SIZE=\'10\'>{}<BR/>{}</FONT>'.format(self.runtimeEdgeId, edgePropertiesString(self.executionProperties), self.encoderFactory, self.decoderFactory)
         return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(self.src.oneVertex.idx,
                 self.dst.oneVertex.idx, self.src.logicalEnd, self.dst.logicalEnd, label)
 
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/BytesCoder.java b/common/src/main/java/edu/snu/nemo/common/coder/BytesCoder.java
deleted file mode 100644
index 3e467f50..00000000
--- a/common/src/main/java/edu/snu/nemo/common/coder/BytesCoder.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.common.coder;
-
-import java.io.*;
-
-/**
- * A {@link Coder} which is used for an array of bytes.
- */
-public final class BytesCoder implements Coder<byte[]> {
-
-  /**
-   * Constructor.
-   */
-  public BytesCoder() {
-  }
-
-  @Override
-  public void encode(final byte[] value, final OutputStream outStream) throws IOException {
-    try (final DataOutputStream dataOutputStream = new DataOutputStream(outStream)) {
-      dataOutputStream.writeInt(value.length); // Write the size of this byte array.
-      dataOutputStream.write(value);
-    }
-  }
-
-  @Override
-  public byte[] decode(final InputStream inStream) throws IOException {
-    // If the inStream is closed well in upper level, it is okay to not close this stream
-    // because the DataInputStream itself will not contain any extra information.
-    // (when we close this stream, the inStream will be closed together.)
-    final DataInputStream dataInputStream = new DataInputStream(inStream);
-    final int bytesToRead = dataInputStream.readInt();
-    final byte[] bytes = new byte[bytesToRead]; // Read the size of this byte array.
-    final int readBytes = dataInputStream.read(bytes);
-    if (bytesToRead != readBytes) {
-      throw new IOException("Have to read " + bytesToRead + " but read only " + readBytes + " bytes.");
-    }
-    return bytes;
-  }
-}
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/Coder.java b/common/src/main/java/edu/snu/nemo/common/coder/Coder.java
deleted file mode 100644
index 49c56d18..00000000
--- a/common/src/main/java/edu/snu/nemo/common/coder/Coder.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.common.coder;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-/**
- * A coder object encodes or decodes values of type {@code T} into byte streams.
- *
- * @param <T> element type.
- */
-public interface Coder<T> extends Serializable {
-  /**
-   * Encodes the given value onto the specified output stream.
-   * It have to be able to encode the given stream consequently by calling this method repeatedly.
-   * Because the user can want to keep a single output stream and continuously concatenate elements,
-   * the output stream should not be closed.
-   *
-   * @param element   the element to be encoded
-   * @param outStream the stream on which encoded bytes are written
-   * @throws IOException if fail to encode
-   */
-  void encode(T element, OutputStream outStream) throws IOException;
-
-  /**
-   * Decodes the a value from the given input stream.
-   * It have to be able to decode the given stream consequently by calling this method repeatedly.
-   * Because there are many elements in the input stream, the stream should not be closed.
-   *
-   * @param inStream the stream from which bytes are read
-   * @return the decoded element
-   * @throws IOException if fail to decode
-   */
-  T decode(InputStream inStream) throws IOException;
-
-  /**
-   * Dummy coder.
-   */
-  Coder DUMMY_CODER = new DummyCoder();
-
-  /**
-   * Dummy coder implementation which is not supposed to be used.
-   */
-  final class DummyCoder implements Coder {
-
-    @Override
-    public void encode(final Object value, final OutputStream outStream) {
-      throw new RuntimeException("DummyCoder is not supposed to be used.");
-    }
-
-    @Override
-    public Object decode(final InputStream inStream) {
-      throw new RuntimeException("DummyCoder is not supposed to be used.");
-    }
-
-    @Override
-    public String toString() {
-      return "DUMMY_CODER";
-    }
-  }
-}
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java
new file mode 100644
index 00000000..a93d8437
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.coder;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+
+/**
+ * A decoder factory object which generates decoders that decode values of type {@code T} into byte streams.
+ * To avoid to generate instance-based coder such as Spark serializer for every decoding,
+ * user need to instantiate a decoder instance and use it.
+ *
+ * @param <T> element type.
+ */
+public interface DecoderFactory<T> extends Serializable {
+
+  /**
+   * Get a decoder instance.
+   *
+   * @param inputStream the input stream to decode.
+   * @return the decoder instance.
+   * @throws IOException if fail to get the instance.
+   */
+  Decoder<T> create(InputStream inputStream) throws IOException;
+
+  /**
+   * Interface of Decoder.
+   *
+   * @param <T> element type.
+   */
+  interface Decoder<T> extends Serializable {
+
+    /**
+     * Decodes the a value from the given input stream.
+     * It have to be able to decode the given stream consequently by calling this method repeatedly.
+     * Because there are many elements in the input stream, the stream should not be closed.
+     *
+     * @return the decoded element
+     * @throws IOException if fail to decode
+     */
+    T decode() throws IOException;
+  }
+
+  /**
+   * Dummy coder factory.
+   */
+  DecoderFactory DUMMY_DECODER_FACTORY = new DummyDecoderFactory();
+
+  /**
+   * Dummy coder factory implementation which is not supposed to be used.
+   */
+  final class DummyDecoderFactory implements DecoderFactory {
+
+    private final Decoder dummyDecoder = new DummyDecoder();
+
+    /**
+     * DummyDecoder.
+     */
+    private final class DummyDecoder implements Decoder {
+
+      @Override
+      public Object decode() {
+        throw new RuntimeException("DummyDecoder is not supposed to be used.");
+      }
+    }
+
+    @Override
+    public Decoder create(final InputStream inputStream) {
+      return dummyDecoder;
+    }
+
+    @Override
+    public String toString() {
+      return "DUMMY_DECODER_FACTORY";
+    }
+  }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java
new file mode 100644
index 00000000..d63fafb9
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.coder;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * A encoder factory object which generates encoders that encode values of type {@code T} into byte streams.
+ * To avoid to generate instance-based coder such as Spark serializer for every encoding,
+ * user need to explicitly instantiate an encoder instance and use it.
+ *
+ * @param <T> element type.
+ */
+public interface EncoderFactory<T> extends Serializable {
+
+  /**
+   * Get an encoder instance.
+   *
+   * @param outputStream the stream on which encoded bytes are written
+   * @return the encoder instance.
+   * @throws IOException if fail to get the instance.
+   */
+  Encoder<T> create(OutputStream outputStream) throws IOException;
+
+  /**
+   * Interface of Encoder.
+   *
+   * @param <T> element type.
+   */
+  interface Encoder<T> extends Serializable {
+
+    /**
+     * Encodes the given value onto the specified output stream.
+     * It have to be able to encode the given stream consequently by calling this method repeatedly.
+     * Because the user can want to keep a single output stream and continuously concatenate elements,
+     * the output stream should not be closed.
+     *
+     * @param element the element to be encoded
+     * @throws IOException if fail to encode
+     */
+    void encode(T element) throws IOException;
+  }
+
+  /**
+   * Dummy encoder factory.
+   */
+  EncoderFactory DUMMY_ENCODER_FACTORY = new DummyEncoderFactory();
+
+  /**
+   * Dummy encoder factory implementation which is not supposed to be used.
+   */
+  final class DummyEncoderFactory implements EncoderFactory {
+
+    private final Encoder dummyEncoder = new DummyEncoder();
+
+    /**
+     * DummyEncoder.
+     */
+    private final class DummyEncoder implements Encoder {
+
+      @Override
+      public void encode(final Object element) {
+        throw new RuntimeException("DummyEncoder is not supposed to be used.");
+      }
+    }
+
+    @Override
+    public Encoder create(final OutputStream outputStream) {
+      return dummyEncoder;
+    }
+
+    @Override
+    public String toString() {
+      return "DUMMY_ENCODER_FACTORY";
+    }
+  }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/IntCoder.java b/common/src/main/java/edu/snu/nemo/common/coder/IntCoder.java
deleted file mode 100644
index face9942..00000000
--- a/common/src/main/java/edu/snu/nemo/common/coder/IntCoder.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.common.coder;
-
-import java.io.*;
-
-/**
- * A {@link Coder} which is used for an integer.
- */
-public final class IntCoder implements Coder<Integer> {
-
-  /**
-   * A private constructor.
-   */
-  private IntCoder() {
-  }
-
-  /**
-   * Static initializer of the coder.
-   */
-  public static IntCoder of() {
-    return new IntCoder();
-  }
-
-  @Override
-  public void encode(final Integer value, final OutputStream outStream) throws IOException {
-    final DataOutputStream dataOutputStream = new DataOutputStream(outStream);
-    dataOutputStream.writeInt(value);
-  }
-
-  @Override
-  public Integer decode(final InputStream inStream) throws IOException {
-    // If the inStream is closed well in upper level, it is okay to not close this stream
-    // because the DataInputStream itself will not contain any extra information.
-    // (when we close this stream, the inStream will be closed together.)
-    final DataInputStream dataInputStream = new DataInputStream(inStream);
-    return dataInputStream.readInt();
-  }
-}
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/IntDecoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/IntDecoderFactory.java
new file mode 100644
index 00000000..d79ec35c
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/coder/IntDecoderFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.coder;
+
+import java.io.*;
+
+/**
+ * A {@link DecoderFactory} which is used for an integer.
+ */
+public final class IntDecoderFactory implements DecoderFactory<Integer> {
+
+  private static final IntDecoderFactory INT_DECODER_FACTORY = new IntDecoderFactory();
+
+  /**
+   * A private constructor.
+   */
+  private IntDecoderFactory() {
+    // do nothing.
+  }
+
+  /**
+   * Static initializer of the coder.
+   */
+  public static IntDecoderFactory of() {
+    return INT_DECODER_FACTORY;
+  }
+
+  @Override
+  public Decoder<Integer> create(final InputStream inputStream) {
+    return new IntDecoder(inputStream);
+  }
+
+  /**
+   * IntDecoder.
+   */
+  private final class IntDecoder implements Decoder<Integer> {
+
+    private final DataInputStream inputStream;
+
+    /**
+     * Constructor.
+     *
+     * @param inputStream  the input stream to decode.
+     */
+    private IntDecoder(final InputStream inputStream) {
+      // If the inputStream is closed well in upper level, it is okay to not close this stream
+      // because the DataInputStream itself will not contain any extra information.
+      // (when we close this stream, the input will be closed together.)
+      this.inputStream = new DataInputStream(inputStream);
+    }
+
+    @Override
+    public Integer decode() throws IOException {
+      return inputStream.readInt();
+    }
+  }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/IntEncoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/IntEncoderFactory.java
new file mode 100644
index 00000000..0a40de56
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/coder/IntEncoderFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.coder;
+
+import java.io.*;
+
+/**
+ * A {@link EncoderFactory} which is used for an integer.
+ */
+public final class IntEncoderFactory implements EncoderFactory<Integer> {
+
+  private static final IntEncoderFactory INT_ENCODER_FACTORY = new IntEncoderFactory();
+
+  /**
+   * A private constructor.
+   */
+  private IntEncoderFactory() {
+    // do nothing.
+  }
+
+  /**
+   * Static initializer of the coder.
+   */
+  public static IntEncoderFactory of() {
+    return INT_ENCODER_FACTORY;
+  }
+
+  @Override
+  public Encoder<Integer> create(final OutputStream outputStream) {
+    return new IntEncoder(outputStream);
+  }
+
+  /**
+   * IntEncoder.
+   */
+  private final class IntEncoder implements Encoder<Integer> {
+
+    private final DataOutputStream outputStream;
+
+    /**
+     * Constructor.
+     *
+     * @param outputStream the output stream to store the encoded bytes.
+     */
+    private IntEncoder(final OutputStream outputStream) {
+      // If the outputStream is closed well in upper level, it is okay to not close this stream
+      // because the DataOutputStream itself will not contain any extra information.
+      // (when we close this stream, the output will be closed together.)
+      this.outputStream = new DataOutputStream(outputStream);
+    }
+
+    @Override
+    public void encode(final Integer value) throws IOException {
+      outputStream.writeInt(value);
+    }
+  }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/PairCoder.java b/common/src/main/java/edu/snu/nemo/common/coder/PairCoder.java
deleted file mode 100644
index 3ad3552a..00000000
--- a/common/src/main/java/edu/snu/nemo/common/coder/PairCoder.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.common.coder;
-
-import edu.snu.nemo.common.Pair;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * A Coder for {@link edu.snu.nemo.common.Pair}. Reference: KvCoder in BEAM.
- * @param <A> type for the left coder.
- * @param <B> type for the right coder.
- */
-public final class PairCoder<A, B> implements Coder<Pair<A, B>> {
-  private final Coder<A> leftCoder;
-  private final Coder<B> rightCoder;
-
-  /**
-   * Private constructor of PairCoder class.
-   * @param leftCoder coder for right element.
-   * @param rightCoder coder for right element.
-   */
-  private PairCoder(final Coder<A> leftCoder, final Coder<B> rightCoder) {
-    this.leftCoder = leftCoder;
-    this.rightCoder = rightCoder;
-  }
-
-  /**
-   * static initializer of the class.
-   * @param leftCoder left coder.
-   * @param rightCoder right coder.
-   * @param <A> type of the left element.
-   * @param <B> type of the right element.
-   * @return the new PairCoder.
-   */
-  public static <A, B> PairCoder<A, B> of(final Coder<A> leftCoder, final Coder<B> rightCoder) {
-    return new PairCoder<>(leftCoder, rightCoder);
-  }
-
-  /**
-   * @return the left coder.
-   */
-  Coder<A> getLeftCoder() {
-    return leftCoder;
-  }
-
-  /**
-   * @return the right coder.
-   */
-  Coder<B> getRightCoder() {
-    return rightCoder;
-  }
-
-  @Override
-  public void encode(final Pair<A, B> pair, final OutputStream outStream) throws IOException {
-    if (pair == null) {
-      throw new IOException("cannot encode a null pair");
-    }
-    leftCoder.encode(pair.left(), outStream);
-    rightCoder.encode(pair.right(), outStream);
-  }
-
-  @Override
-  public Pair<A, B> decode(final InputStream inStream) throws IOException {
-    final A key = leftCoder.decode(inStream);
-    final B value = rightCoder.decode(inStream);
-    return Pair.of(key, value);
-  }
-}
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/PairDecoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/PairDecoderFactory.java
new file mode 100644
index 00000000..9cd0fb17
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/coder/PairDecoderFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.coder;
+
+import edu.snu.nemo.common.Pair;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * An DecoderFactory for {@link Pair}. Reference: KvCoder in BEAM.
+ * @param <A> type for the left coder.
+ * @param <B> type for the right coder.
+ */
+public final class PairDecoderFactory<A, B> implements DecoderFactory<Pair<A, B>> {
+  private final DecoderFactory<A> leftDecoderFactory;
+  private final DecoderFactory<B> rightDecoderFactory;
+
+  /**
+   * Private constructor of PairDecoderFactory class.
+   *
+   * @param leftDecoderFactory  coder for right element.
+   * @param rightDecoderFactory coder for right element.
+   */
+  private PairDecoderFactory(final DecoderFactory<A> leftDecoderFactory,
+                             final DecoderFactory<B> rightDecoderFactory) {
+    this.leftDecoderFactory = leftDecoderFactory;
+    this.rightDecoderFactory = rightDecoderFactory;
+  }
+
+  /**
+   * static initializer of the class.
+   *
+   * @param leftDecoderFactory  left coder.
+   * @param rightDecoderFactory right coder.
+   * @param <A>          type of the left element.
+   * @param <B>          type of the right element.
+   * @return the new PairDecoderFactory.
+   */
+  public static <A, B> PairDecoderFactory<A, B> of(final DecoderFactory<A> leftDecoderFactory,
+                                                   final DecoderFactory<B> rightDecoderFactory) {
+    return new PairDecoderFactory<>(leftDecoderFactory, rightDecoderFactory);
+  }
+
+  @Override
+  public Decoder<Pair<A, B>> create(final InputStream inputStream) throws IOException {
+    return new PairDecoder<>(inputStream, leftDecoderFactory, rightDecoderFactory);
+  }
+
+  /**
+   * PairDecoder.
+   * @param <T1> type for the left coder.
+   * @param <T2> type for the right coder.
+   */
+  private final class PairDecoder<T1, T2> implements Decoder<Pair<T1, T2>> {
+
+    private final Decoder<T1> leftDecoder;
+    private final Decoder<T2> rightDecoder;
+
+    /**
+     * Constructor.
+     *
+     * @param inputStream  the input stream to decode.
+     * @param leftDecoderFactory  the actual decoder to use for left elements.
+     * @param rightDecoderFactory the actual decoder to use for right elements.
+     * @throws IOException if fail to instantiate coders.
+     */
+    private PairDecoder(final InputStream inputStream,
+                        final DecoderFactory<T1> leftDecoderFactory,
+                        final DecoderFactory<T2> rightDecoderFactory) throws IOException {
+      this.leftDecoder = leftDecoderFactory.create(inputStream);
+      this.rightDecoder = rightDecoderFactory.create(inputStream);
+    }
+
+    @Override
+    public Pair<T1, T2> decode() throws IOException {
+      final T1 key = leftDecoder.decode();
+      final T2 value = rightDecoder.decode();
+      return Pair.of(key, value);
+    }
+  }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/PairEncoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/PairEncoderFactory.java
new file mode 100644
index 00000000..0837b579
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/coder/PairEncoderFactory.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.coder;
+
+import edu.snu.nemo.common.Pair;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * An EncoderFactory for {@link Pair}. Reference: KvCoder in BEAM.
+ * @param <A> type for the left coder.
+ * @param <B> type for the right coder.
+ */
+public final class PairEncoderFactory<A, B> implements EncoderFactory<Pair<A, B>> {
+  private final EncoderFactory<A> leftEncoderFactory;
+  private final EncoderFactory<B> rightEncoderFactory;
+
+  /**
+   * Private constructor of PairEncoderFactory class.
+   *
+   * @param leftEncoderFactory  coder for right element.
+   * @param rightEncoderFactory coder for right element.
+   */
+  private PairEncoderFactory(final EncoderFactory<A> leftEncoderFactory,
+                             final EncoderFactory<B> rightEncoderFactory) {
+    this.leftEncoderFactory = leftEncoderFactory;
+    this.rightEncoderFactory = rightEncoderFactory;
+  }
+
+  /**
+   * static initializer of the class.
+   *
+   * @param leftEncoderFactory  left coder.
+   * @param rightEncoderFactory right coder.
+   * @param <A>          type of the left element.
+   * @param <B>          type of the right element.
+   * @return the new PairEncoderFactory.
+   */
+  public static <A, B> PairEncoderFactory<A, B> of(final EncoderFactory<A> leftEncoderFactory,
+                                                   final EncoderFactory<B> rightEncoderFactory) {
+    return new PairEncoderFactory<>(leftEncoderFactory, rightEncoderFactory);
+  }
+
+  @Override
+  public Encoder<Pair<A, B>> create(final OutputStream outputStream) throws IOException {
+    return new PairEncoder<>(outputStream, leftEncoderFactory, rightEncoderFactory);
+  }
+
+  /**
+   * PairEncoder.
+   * @param <T1> type for the left coder.
+   * @param <T2> type for the right coder.
+   */
+  private final class PairEncoder<T1, T2> implements Encoder<Pair<T1, T2>> {
+
+    private final Encoder<T1> leftEncoder;
+    private final Encoder<T2> rightEncoder;
+
+    /**
+     * Constructor.
+     *
+     * @param outputStream the output stream to store the encoded bytes.
+     * @param leftEncoderFactory  the actual encoder to use for left elements.
+     * @param rightEncoderFactory the actual encoder to use for right elements.
+     * @throws IOException if fail to instantiate coders.
+     */
+    private PairEncoder(final OutputStream outputStream,
+                        final EncoderFactory<T1> leftEncoderFactory,
+                        final EncoderFactory<T2> rightEncoderFactory) throws IOException {
+      this.leftEncoder = leftEncoderFactory.create(outputStream);
+      this.rightEncoder = rightEncoderFactory.create(outputStream);
+    }
+
+    @Override
+    public void encode(final Pair<T1, T2> pair) throws IOException {
+      if (pair == null) {
+        throw new IOException("cannot encode a null pair");
+      }
+      leftEncoder.encode(pair.left());
+      rightEncoder.encode(pair.right());
+    }
+  }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CompressionProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CompressionProperty.java
index 64023198..441949e3 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CompressionProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CompressionProperty.java
@@ -23,6 +23,7 @@
 public final class CompressionProperty extends EdgeExecutionProperty<CompressionProperty.Value> {
   /**
    * Constructor.
+   *
    * @param value value of the execution property.
    */
   private CompressionProperty(final Value value) {
@@ -31,6 +32,7 @@ private CompressionProperty(final Value value) {
 
   /**
    * Static method exposing the constructor.
+   *
    * @param value value of the new execution property.
    * @return the newly created execution property.
    */
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DecoderProperty.java
similarity index 76%
rename from common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java
rename to common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DecoderProperty.java
index aae1c6e7..7ef507a8 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DecoderProperty.java
@@ -15,19 +15,19 @@
  */
 package edu.snu.nemo.common.ir.edge.executionproperty;
 
-import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.common.coder.DecoderFactory;
 import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
 
 /**
- * Coder ExecutionProperty.
+ * Decoder ExecutionProperty.
  */
-public final class CoderProperty extends EdgeExecutionProperty<Coder> {
+public final class DecoderProperty extends EdgeExecutionProperty<DecoderFactory> {
   /**
    * Constructor.
    *
    * @param value value of the execution property.
    */
-  private CoderProperty(final Coder value) {
+  private DecoderProperty(final DecoderFactory value) {
     super(value);
   }
 
@@ -37,7 +37,7 @@ private CoderProperty(final Coder value) {
    * @param value value of the new execution property.
    * @return the newly created execution property.
    */
-  public static CoderProperty of(final Coder value) {
-    return new CoderProperty(value);
+  public static DecoderProperty of(final DecoderFactory value) {
+    return new DecoderProperty(value);
   }
 }
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DecompressionProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DecompressionProperty.java
new file mode 100644
index 00000000..bfd09e08
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DecompressionProperty.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.edge.executionproperty;
+
+import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
+
+/**
+ * Decompression ExecutionProperty.
+ * It shares the value with {@link CompressionProperty}.
+ */
+public final class DecompressionProperty extends EdgeExecutionProperty<CompressionProperty.Value> {
+  /**
+   * Constructor.
+   *
+   * @param value value of the execution property.
+   */
+  private DecompressionProperty(final CompressionProperty.Value value) {
+    super(value);
+  }
+
+  /**
+   * Static method exposing the constructor.
+   *
+   * @param value value of the new execution property.
+   * @return the newly created execution property.
+   */
+  public static DecompressionProperty of(final CompressionProperty.Value value) {
+    return new DecompressionProperty(value);
+  }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/EncoderProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/EncoderProperty.java
new file mode 100644
index 00000000..f214f17f
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/EncoderProperty.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.edge.executionproperty;
+
+import edu.snu.nemo.common.coder.EncoderFactory;
+import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
+
+/**
+ * EncoderFactory ExecutionProperty.
+ */
+public final class EncoderProperty extends EdgeExecutionProperty<EncoderFactory> {
+  /**
+   * Constructor.
+   *
+   * @param value value of the execution property.
+   */
+  private EncoderProperty(final EncoderFactory value) {
+    super(value);
+  }
+
+  /**
+   * Static method exposing the constructor.
+   *
+   * @param value value of the new execution property.
+   * @return the newly created execution property.
+   */
+  public static EncoderProperty of(final EncoderFactory value) {
+    return new EncoderProperty(value);
+  }
+}
diff --git a/common/src/test/java/edu/snu/nemo/common/ir/LoopVertexTest.java b/common/src/test/java/edu/snu/nemo/common/ir/LoopVertexTest.java
index 4650aae3..558a9f76 100644
--- a/common/src/test/java/edu/snu/nemo/common/ir/LoopVertexTest.java
+++ b/common/src/test/java/edu/snu/nemo/common/ir/LoopVertexTest.java
@@ -15,7 +15,6 @@
  */
 package edu.snu.nemo.common.ir;
 
-import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
diff --git a/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java b/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
index f7eaad7f..1ca18ff9 100644
--- a/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
+++ b/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
@@ -15,15 +15,10 @@
  */
 package edu.snu.nemo.common.ir.executionproperty;
 
-import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.common.coder.DecoderFactory;
+import edu.snu.nemo.common.coder.EncoderFactory;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
-import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
-import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.*;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
@@ -66,8 +61,10 @@ public void testPutGetAndRemove() {
     assertEquals(DataStoreProperty.Value.MemoryStore, edgeMap.get(DataStoreProperty.class).get());
     edgeMap.put(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
     assertEquals(DataFlowModelProperty.Value.Pull, edgeMap.get(DataFlowModelProperty.class).get());
-    edgeMap.put(CoderProperty.of(Coder.DUMMY_CODER));
-    assertEquals(Coder.DUMMY_CODER, edgeMap.get(CoderProperty.class).get());
+    edgeMap.put(EncoderProperty.of(EncoderFactory.DUMMY_ENCODER_FACTORY));
+    assertEquals(EncoderFactory.DUMMY_ENCODER_FACTORY, edgeMap.get(EncoderProperty.class).get());
+    edgeMap.put(DecoderProperty.of(DecoderFactory.DUMMY_DECODER_FACTORY));
+    assertEquals(DecoderFactory.DUMMY_DECODER_FACTORY, edgeMap.get(DecoderProperty.class).get());
 
     edgeMap.remove(DataFlowModelProperty.class);
     assertFalse(edgeMap.get(DataFlowModelProperty.class).isPresent());
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
index bf3253be..b85057b5 100644
--- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
+++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
@@ -15,9 +15,12 @@
  */
 package edu.snu.nemo.compiler.frontend.beam;
 
-import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
+import edu.snu.nemo.common.Pair;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
-import edu.snu.nemo.compiler.frontend.beam.coder.BeamCoder;
+import edu.snu.nemo.compiler.frontend.beam.coder.BeamDecoderFactory;
+import edu.snu.nemo.compiler.frontend.beam.coder.BeamEncoderFactory;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
@@ -55,10 +58,11 @@
   private final PipelineOptions options;
   // loopVertexStack keeps track of where the beam program is: whether it is inside a composite transform or it is not.
   private final Stack<LoopVertex> loopVertexStack;
-  private final Map<PValue, BeamCoder> pValueToCoder;
+  private final Map<PValue, Pair<BeamEncoderFactory, BeamDecoderFactory>> pValueToCoder;
 
   /**
    * Constructor of the BEAM Visitor.
+   *
    * @param builder DAGBuilder to build the DAG with.
    * @param options Pipeline options.
    */
@@ -96,10 +100,11 @@ public void visitPrimitiveTransform(final TransformHierarchy.Node beamNode) {
       throw new UnsupportedOperationException(beamNode.toString());
     }
 
-    final IRVertex irVertex = convertToVertex(beamNode, builder, pValueToVertex, pValueToCoder, options,
-        loopVertexStack);
+    final IRVertex irVertex =
+        convertToVertex(beamNode, builder, pValueToVertex, pValueToCoder, options, loopVertexStack);
     beamNode.getOutputs().values().stream().filter(v -> v instanceof PCollection).map(v -> (PCollection) v)
-        .forEach(output -> pValueToCoder.put(output, new BeamCoder(output.getCoder())));
+        .forEach(output -> pValueToCoder.put(output,
+            Pair.of(new BeamEncoderFactory(output.getCoder()), new BeamDecoderFactory(output.getCoder()))));
 
     beamNode.getOutputs().values().forEach(output -> pValueToVertex.put(output, irVertex));
 
@@ -107,7 +112,9 @@ public void visitPrimitiveTransform(final TransformHierarchy.Node beamNode) {
         .forEach(pValue -> {
           final IRVertex src = pValueToVertex.get(pValue);
           final IREdge edge = new IREdge(getEdgeCommunicationPattern(src, irVertex), src, irVertex);
-          edge.setProperty(CoderProperty.of(pValueToCoder.get(pValue)));
+          final Pair<BeamEncoderFactory, BeamDecoderFactory> coderPair = pValueToCoder.get(pValue);
+          edge.setProperty(EncoderProperty.of(coderPair.left()));
+          edge.setProperty(DecoderProperty.of(coderPair.right()));
           edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor()));
           this.builder.connectVertices(edge);
         });
@@ -115,22 +122,24 @@ public void visitPrimitiveTransform(final TransformHierarchy.Node beamNode) {
 
   /**
    * Convert Beam node to IR vertex.
-   * @param beamNode input beam node.
-   * @param builder the DAG builder to add the vertex to.
-   * @param pValueToVertex PValue to Vertex map.
-   * @param pValueToCoder PValue to Coder map.
-   * @param options pipeline options.
+   *
+   * @param beamNode        input beam node.
+   * @param builder         the DAG builder to add the vertex to.
+   * @param pValueToVertex  PValue to Vertex map.
+   * @param pValueToCoder   PValue to EncoderFactory and DecoderFactory map.
+   * @param options         pipeline options.
    * @param loopVertexStack Stack to get the current loop vertex that the operator vertex will be assigned to.
-   * @param <I> input type.
-   * @param <O> output type.
+   * @param <I>             input type.
+   * @param <O>             output type.
    * @return newly created vertex.
    */
-  private static <I, O> IRVertex convertToVertex(final TransformHierarchy.Node beamNode,
-                                                 final DAGBuilder<IRVertex, IREdge> builder,
-                                                 final Map<PValue, IRVertex> pValueToVertex,
-                                                 final Map<PValue, BeamCoder> pValueToCoder,
-                                                 final PipelineOptions options,
-                                                 final Stack<LoopVertex> loopVertexStack) {
+  private static <I, O> IRVertex
+  convertToVertex(final TransformHierarchy.Node beamNode,
+                  final DAGBuilder<IRVertex, IREdge> builder,
+                  final Map<PValue, IRVertex> pValueToVertex,
+                  final Map<PValue, Pair<BeamEncoderFactory, BeamDecoderFactory>> pValueToCoder,
+                  final PipelineOptions options,
+                  final Stack<LoopVertex> loopVertexStack) {
     final PTransform beamTransform = beamNode.getTransform();
     final IRVertex irVertex;
     if (beamTransform instanceof Read.Bounded) {
@@ -147,13 +156,14 @@ public void visitPrimitiveTransform(final TransformHierarchy.Node beamNode) {
       pValueToVertex.put(view.getView(), irVertex);
       builder.addVertex(irVertex, loopVertexStack);
       // Coders for outgoing edges in CreateViewTransform.
-      // Since outgoing PValues for CreateViewTransform is PCollectionView, we cannot use PCollection::getCoder to
-      // obtain coders.
+      // Since outgoing PValues for CreateViewTransform is PCollectionView,
+      // we cannot use PCollection::getEncoderFactory to obtain coders.
       final Coder beamInputCoder = beamNode.getInputs().values().stream()
           .filter(v -> v instanceof PCollection).map(v -> (PCollection) v).findFirst()
           .orElseThrow(() -> new RuntimeException("No inputs provided to " + beamNode.getFullName())).getCoder();
       beamNode.getOutputs().values().stream()
-          .forEach(output -> pValueToCoder.put(output, getCoderForView(view.getView().getViewFn(), beamInputCoder)));
+          .forEach(output ->
+              pValueToCoder.put(output, getCoderPairForView(view.getView().getViewFn(), beamInputCoder)));
     } else if (beamTransform instanceof Window) {
       final Window<I> window = (Window<I>) beamTransform;
       final WindowTransform transform = new WindowTransform(window.getWindowFn());
@@ -187,35 +197,40 @@ public void visitPrimitiveTransform(final TransformHierarchy.Node beamNode) {
 
   /**
    * Connect side inputs to the vertex.
-   * @param builder the DAG builder to add the vertex to.
-   * @param sideInputs side inputs.
+   *
+   * @param builder        the DAG builder to add the vertex to.
+   * @param sideInputs     side inputs.
    * @param pValueToVertex PValue to Vertex map.
-   * @param pValueToCoder  PValue to Coder map.
-   * @param irVertex wrapper for a user operation in the IR. (Where the side input is headed to)
+   * @param pValueToCoder  PValue to Encoder/Decoder factory map.
+   * @param irVertex       wrapper for a user operation in the IR. (Where the side input is headed to)
    */
   private static void connectSideInputs(final DAGBuilder<IRVertex, IREdge> builder,
                                         final List<PCollectionView<?>> sideInputs,
                                         final Map<PValue, IRVertex> pValueToVertex,
-                                        final Map<PValue, BeamCoder> pValueToCoder,
+                                        final Map<PValue, Pair<BeamEncoderFactory, BeamDecoderFactory>> pValueToCoder,
                                         final IRVertex irVertex) {
     sideInputs.stream().filter(pValueToVertex::containsKey)
         .forEach(pValue -> {
           final IRVertex src = pValueToVertex.get(pValue);
           final IREdge edge = new IREdge(getEdgeCommunicationPattern(src, irVertex),
               src, irVertex, true);
-          edge.setProperty(CoderProperty.of(pValueToCoder.get(pValue)));
+          final Pair<BeamEncoderFactory, BeamDecoderFactory> coder = pValueToCoder.get(pValue);
+          edge.setProperty(EncoderProperty.of(coder.left()));
+          edge.setProperty(DecoderProperty.of(coder.right()));
           edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor()));
           builder.connectVertices(edge);
         });
   }
 
   /**
-   * Get appropriate coder for {@link PCollectionView}.
-   * @param viewFn {@link ViewFn} from the corresponding {@link View.CreatePCollectionView} transform
+   * Get appropriate encoder and decoder pair for {@link PCollectionView}.
+   *
+   * @param viewFn         {@link ViewFn} from the corresponding {@link View.CreatePCollectionView} transform
    * @param beamInputCoder Beam {@link Coder} for input value to {@link View.CreatePCollectionView}
-   * @return appropriate {@link BeamCoder}
+   * @return appropriate pair of {@link BeamEncoderFactory} and {@link BeamDecoderFactory}
    */
-  private static BeamCoder getCoderForView(final ViewFn viewFn, final Coder beamInputCoder) {
+  private static Pair<BeamEncoderFactory, BeamDecoderFactory> getCoderPairForView(final ViewFn viewFn,
+                                                                                  final Coder beamInputCoder) {
     final Coder beamOutputCoder;
     if (viewFn instanceof PCollectionViews.IterableViewFn) {
       beamOutputCoder = IterableCoder.of(beamInputCoder);
@@ -232,11 +247,12 @@ private static BeamCoder getCoderForView(final ViewFn viewFn, final Coder beamIn
     } else {
       throw new UnsupportedOperationException("Unsupported viewFn: " + viewFn.getClass());
     }
-    return new BeamCoder(beamOutputCoder);
+    return Pair.of(new BeamEncoderFactory(beamOutputCoder), new BeamDecoderFactory(beamOutputCoder));
   }
 
   /**
    * Get the edge type for the src, dst vertex.
+   *
    * @param src source vertex.
    * @param dst destination vertex.
    * @return the appropriate edge type.
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamCoder.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamCoder.java
deleted file mode 100644
index 56f67142..00000000
--- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamCoder.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.compiler.frontend.beam.coder;
-
-import edu.snu.nemo.common.coder.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.VoidCoder;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * {@link Coder} from {@link org.apache.beam.sdk.coders.Coder}.
- * @param <T> element type.
- */
-public final class BeamCoder<T> implements Coder<T> {
-  private final org.apache.beam.sdk.coders.Coder<T> beamCoder;
-
-  /**
-   * Constructor of BeamCoder.
-   * @param beamCoder actual Beam coder to use.
-   */
-  public BeamCoder(final org.apache.beam.sdk.coders.Coder<T> beamCoder) {
-    this.beamCoder = beamCoder;
-  }
-
-  @Override
-  public void encode(final T value, final OutputStream outStream) throws IOException {
-    if (beamCoder instanceof VoidCoder) {
-      outStream.write(0);
-      return;
-    }
-    try {
-      beamCoder.encode(value, outStream);
-    } catch (final CoderException e) {
-      throw new IOException(e);
-    }
-  }
-
-  @Override
-  public T decode(final InputStream inStream) throws IOException {
-    if (beamCoder instanceof VoidCoder && inStream.read() == -1) {
-      throw new IOException("End of stream reached");
-    }
-    try {
-      return beamCoder.decode(inStream);
-    } catch (final CoderException e) {
-      throw new IOException(e);
-    }
-  }
-
-  @Override
-  public String toString() {
-    return beamCoder.toString();
-  }
-}
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java
new file mode 100644
index 00000000..7ebea386
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.frontend.beam.coder;
+
+import edu.snu.nemo.common.coder.DecoderFactory;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VoidCoder;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * {@link DecoderFactory} from {@link org.apache.beam.sdk.coders.Coder}.
+ * @param <T> the type of element to encode.
+ */
+public final class BeamDecoderFactory<T> implements DecoderFactory<T> {
+  private final Coder<T> beamCoder;
+
+  /**
+   * Constructor of BeamDecoderFactory.
+   *
+   * @param beamCoder actual Beam coder to use.
+   */
+  public BeamDecoderFactory(final Coder<T> beamCoder) {
+    this.beamCoder = beamCoder;
+  }
+
+  @Override
+  public Decoder<T> create(final InputStream inputStream) {
+    if (beamCoder instanceof VoidCoder) {
+      return new BeamVoidDecoder<>(inputStream, beamCoder);
+    } else {
+      return new BeamDecoder<>(inputStream, beamCoder);
+    }
+  }
+
+  /**
+   * Abstract class for Beam Decoder.
+   * @param <T2> the type of element to decode.
+   */
+  private abstract class BeamAbstractDecoder<T2> implements Decoder<T2> {
+
+    private final Coder<T2> beamCoder;
+    private final InputStream inputStream;
+
+    /**
+     * Constructor.
+     *
+     * @param inputStream the input stream to decode.
+     * @param beamCoder   the actual beam coder to use.
+     */
+    protected BeamAbstractDecoder(final InputStream inputStream,
+                                  final Coder<T2> beamCoder) {
+      this.inputStream = inputStream;
+      this.beamCoder = beamCoder;
+    }
+
+    /**
+     * Decode the actual data internally.
+     *
+     * @return the decoded data.
+     * @throws IOException if fail to decode.
+     */
+    protected T2 decodeInternal() throws IOException {
+      try {
+        return beamCoder.decode(inputStream);
+      } catch (final CoderException e) {
+        throw new IOException(e);
+      }
+    }
+
+    /**
+     * @return the input stream.
+     */
+    protected InputStream getInputStream() {
+      return inputStream;
+    }
+  }
+
+  /**
+   * Beam Decoder for non void objects.
+   * @param <T2> the type of element to decode.
+   */
+  private final class BeamDecoder<T2> extends BeamAbstractDecoder<T2> {
+
+    /**
+     * Constructor.
+     *
+     * @param inputStream the input stream to decode.
+     * @param beamCoder   the actual beam coder to use.
+     */
+    private BeamDecoder(final InputStream inputStream,
+                        final Coder<T2> beamCoder) {
+      super(inputStream, beamCoder);
+    }
+
+    @Override
+    public T2 decode() throws IOException {
+      return decodeInternal();
+    }
+  }
+
+  /**
+   * Beam Decoder for {@link VoidCoder}.
+   * @param <T2> the type of element to decode.
+   */
+  private final class BeamVoidDecoder<T2> extends BeamAbstractDecoder<T2> {
+
+    /**
+     * Constructor.
+     *
+     * @param inputStream the input stream to decode.
+     * @param beamCoder   the actual beam coder to use.
+     */
+    private BeamVoidDecoder(final InputStream inputStream,
+                            final Coder<T2> beamCoder) {
+      super(inputStream, beamCoder);
+    }
+
+    @Override
+    public T2 decode() throws IOException {
+      if (getInputStream().read() == -1) {
+        throw new IOException("End of stream reached");
+      }
+      return decodeInternal();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return beamCoder.toString();
+  }
+}
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamEncoderFactory.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamEncoderFactory.java
new file mode 100644
index 00000000..f67b96c1
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamEncoderFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.frontend.beam.coder;
+
+import edu.snu.nemo.common.coder.EncoderFactory;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VoidCoder;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * {@link EncoderFactory} from {@link Coder}.
+ * @param <T> the type of element to encode.
+ */
+public final class BeamEncoderFactory<T> implements EncoderFactory<T> {
+
+  private final Coder<T> beamCoder;
+
+  /**
+   * Constructor of BeamEncoderFactory.
+   *
+   * @param beamCoder actual Beam coder to use.
+   */
+  public BeamEncoderFactory(final Coder<T> beamCoder) {
+    this.beamCoder = beamCoder;
+  }
+
+  @Override
+  public Encoder<T> create(final OutputStream outputStream) {
+    if (beamCoder instanceof VoidCoder) {
+      return new BeamVoidEncoder<>(outputStream);
+    } else {
+      return new BeamEncoder<>(outputStream, beamCoder);
+    }
+  }
+
+  /**
+   * Beam Encoder for non void objects.
+   *
+   * @param <T2> the type of element to decode.
+   */
+  private final class BeamEncoder<T2> implements Encoder<T2> {
+
+    private final Coder<T2> beamCoder;
+    private final OutputStream outputStream;
+
+    /**
+     * Constructor.
+     *
+     * @param outputStream the output stream to store the encoded bytes.
+     * @param beamCoder    the actual beam coder to use.
+     */
+    private BeamEncoder(final OutputStream outputStream,
+                        final Coder<T2> beamCoder) {
+      this.outputStream = outputStream;
+      this.beamCoder = beamCoder;
+    }
+
+    @Override
+    public void encode(final T2 element) throws IOException {
+      try {
+        beamCoder.encode(element, outputStream);
+      } catch (final CoderException e) {
+        throw new IOException(e);
+      }
+    }
+  }
+
+  /**
+   * Beam Decoder for {@link VoidCoder}.
+   *
+   * @param <T2> the type of element to decode.
+   */
+  private final class BeamVoidEncoder<T2> implements Encoder<T2> {
+
+    private final OutputStream outputStream;
+
+    /**
+     * Constructor.
+     *
+     * @param outputStream the output stream to store the encoded bytes.
+     */
+    private BeamVoidEncoder(final OutputStream outputStream) {
+      this.outputStream = outputStream;
+    }
+
+    @Override
+    public void encode(final T2 element) throws IOException {
+      outputStream.write(0); // emit 0 instead of null to enable to count emitted elements.
+    }
+  }
+
+  @Override
+  public String toString() {
+    return beamCoder.toString();
+  }
+}
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java
index 9c638925..fa5d3804 100644
--- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java
+++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java
@@ -20,7 +20,7 @@
 import java.io.*;
 
 /**
- * Coder for float[].
+ * EncoderFactory for float[].
  */
 public final class FloatArrayCoder extends AtomicCoder<float[]> {
   /**
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/IntArrayCoder.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/IntArrayCoder.java
index 80f53c01..c180bd67 100644
--- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/IntArrayCoder.java
+++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/IntArrayCoder.java
@@ -20,7 +20,7 @@
 import java.io.*;
 
 /**
- * Coder for int[].
+ * EncoderFactory for int[].
  */
 public final class IntArrayCoder extends AtomicCoder<int[]> {
   /**
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/coder/SparkCoder.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/coder/SparkCoder.java
deleted file mode 100644
index 6317326c..00000000
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/coder/SparkCoder.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.compiler.frontend.spark.coder;
-
-import edu.snu.nemo.common.coder.Coder;
-import org.apache.spark.serializer.Serializer;
-import scala.reflect.ClassTag$;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * Kryo Spark Coder for serialization.
- * @param <T> type of the object to (de)serialize.
- */
-public final class SparkCoder<T> implements Coder<T> {
-  private final Serializer serializer;
-
-  /**
-   * Default constructor.
-   * @param serializer kryo serializer.
-   */
-  public SparkCoder(final Serializer serializer) {
-    this.serializer = serializer;
-  }
-
-  @Override
-  public void encode(final T element, final OutputStream outStream) throws IOException {
-    serializer.newInstance().serializeStream(outStream).writeObject(element, ClassTag$.MODULE$.Any());
-  }
-
-  @Override
-  public T decode(final InputStream inStream) throws IOException {
-    final T obj = (T) serializer.newInstance().deserializeStream(inStream).readObject(ClassTag$.MODULE$.Any());
-    return obj;
-  }
-}
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/coder/SparkDecoderFactory.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/coder/SparkDecoderFactory.java
new file mode 100644
index 00000000..605b7662
--- /dev/null
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/coder/SparkDecoderFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.frontend.spark.coder;
+
+import edu.snu.nemo.common.coder.DecoderFactory;
+import org.apache.spark.serializer.DeserializationStream;
+import org.apache.spark.serializer.Serializer;
+import org.apache.spark.serializer.SerializerInstance;
+import scala.reflect.ClassTag$;
+
+import java.io.InputStream;
+
+/**
+ * Spark DecoderFactory for serialization.
+ * @param <T> type of the object to deserialize.
+ */
+public final class SparkDecoderFactory<T> implements DecoderFactory<T> {
+  private final Serializer serializer;
+
+  /**
+   * Default constructor.
+   *
+   * @param serializer Spark serializer.
+   */
+  public SparkDecoderFactory(final Serializer serializer) {
+    this.serializer = serializer;
+  }
+
+  @Override
+  public Decoder<T> create(final InputStream inputStream) {
+    return new SparkDecoder<>(inputStream, serializer.newInstance());
+  }
+
+  /**
+   * SparkDecoder.
+   * @param <T2> type of the object to deserialize.
+   */
+  private final class SparkDecoder<T2> implements Decoder<T2> {
+
+    private final DeserializationStream in;
+
+    /**
+     * Constructor.
+     *
+     * @param inputStream             the input stream to decode.
+     * @param sparkSerializerInstance the actual spark serializer instance to use.
+     */
+    private SparkDecoder(final InputStream inputStream,
+                         final SerializerInstance sparkSerializerInstance) {
+      this.in = sparkSerializerInstance.deserializeStream(inputStream);
+    }
+
+    @Override
+    public T2 decode() {
+      return (T2) in.readObject(ClassTag$.MODULE$.Any());
+    }
+  }
+}
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/coder/SparkEncoderFactory.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/coder/SparkEncoderFactory.java
new file mode 100644
index 00000000..970a46c8
--- /dev/null
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/coder/SparkEncoderFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.frontend.spark.coder;
+
+import edu.snu.nemo.common.coder.EncoderFactory;
+import org.apache.spark.serializer.SerializationStream;
+import org.apache.spark.serializer.Serializer;
+import org.apache.spark.serializer.SerializerInstance;
+import scala.reflect.ClassTag$;
+
+import java.io.OutputStream;
+
+/**
+ * Spark EncoderFactory for serialization.
+ * @param <T> type of the object to serialize.
+ */
+public final class SparkEncoderFactory<T> implements EncoderFactory<T> {
+  private final Serializer serializer;
+
+  /**
+   * Default constructor.
+   *
+   * @param serializer Spark serializer.
+   */
+  public SparkEncoderFactory(final Serializer serializer) {
+    this.serializer = serializer;
+  }
+
+  @Override
+  public Encoder<T> create(final OutputStream outputStream) {
+    return new SparkEncoder<>(outputStream, serializer.newInstance());
+  }
+
+  /**
+   * SparkEncoder.
+   * @param <T2> type of the object to serialize.
+   */
+  private final class SparkEncoder<T2> implements Encoder<T2> {
+
+    private final SerializationStream out;
+
+    /**
+     * Constructor.
+     *
+     * @param outputStream            the output stream to store the encoded bytes.
+     * @param sparkSerializerInstance the actual spark serializer instance to use.
+     */
+    private SparkEncoder(final OutputStream outputStream,
+                         final SerializerInstance sparkSerializerInstance) {
+      this.out = sparkSerializerInstance.serializeStream(outputStream);
+    }
+
+    @Override
+    public void encode(final T2 element) {
+      out.writeObject(element, ClassTag$.MODULE$.Any());
+    }
+  }
+}
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
index 6c4c3955..08731a48 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
@@ -19,14 +19,16 @@
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.KeyExtractorProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.LoopVertex;
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import edu.snu.nemo.compiler.frontend.spark.SparkKeyExtractor;
-import edu.snu.nemo.compiler.frontend.spark.coder.SparkCoder;
+import edu.snu.nemo.compiler.frontend.spark.coder.SparkDecoderFactory;
+import edu.snu.nemo.compiler.frontend.spark.coder.SparkEncoderFactory;
 import edu.snu.nemo.compiler.frontend.spark.transform.CollectTransform;
 import edu.snu.nemo.compiler.frontend.spark.transform.GroupByKeyTransform;
 import edu.snu.nemo.compiler.frontend.spark.transform.ReduceByKeyTransform;
@@ -87,8 +89,10 @@ public static Serializer deriveSerializerFrom(final org.apache.spark.SparkContex
    * @param <T>             type of the return data.
    * @return the data collected.
    */
-  public static <T> List<T> collect(final DAG<IRVertex, IREdge> dag, final Stack<LoopVertex> loopVertexStack,
-                                    final IRVertex lastVertex, final Serializer serializer) {
+  public static <T> List<T> collect(final DAG<IRVertex, IREdge> dag,
+                                    final Stack<LoopVertex> loopVertexStack,
+                                    final IRVertex lastVertex,
+                                    final Serializer serializer) {
     final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>(dag);
 
     // save result in a temporary file
@@ -100,7 +104,8 @@ public static Serializer deriveSerializerFrom(final org.apache.spark.SparkContex
 
     final IREdge newEdge = new IREdge(getEdgeCommunicationPattern(lastVertex, collectVertex),
         lastVertex, collectVertex);
-    newEdge.setProperty(CoderProperty.of(new SparkCoder(serializer)));
+    newEdge.setProperty(EncoderProperty.of(new SparkEncoderFactory(serializer)));
+    newEdge.setProperty(DecoderProperty.of(new SparkDecoderFactory(serializer)));
     newEdge.setProperty(SPARK_KEY_EXTRACTOR_PROP);
     builder.connectVertices(newEdge);
 
diff --git a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala
index 5065f9ce..14f764a9 100644
--- a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala
+++ b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala
@@ -19,11 +19,11 @@ import java.util
 
 import edu.snu.nemo.common.dag.DAGBuilder
 import edu.snu.nemo.common.ir.edge.IREdge
-import edu.snu.nemo.common.ir.edge.executionproperty.{CoderProperty, KeyExtractorProperty}
+import edu.snu.nemo.common.ir.edge.executionproperty.{DecoderProperty, EncoderProperty, KeyExtractorProperty}
 import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty
 import edu.snu.nemo.common.ir.vertex.{IRVertex, LoopVertex, OperatorVertex}
 import edu.snu.nemo.compiler.frontend.spark.SparkKeyExtractor
-import edu.snu.nemo.compiler.frontend.spark.coder.SparkCoder
+import edu.snu.nemo.compiler.frontend.spark.coder.{SparkDecoderFactory, SparkEncoderFactory}
 import edu.snu.nemo.compiler.frontend.spark.core.SparkFrontendUtils
 import edu.snu.nemo.compiler.frontend.spark.transform.ReduceByKeyTransform
 import org.apache.hadoop.conf.Configuration
@@ -72,7 +72,10 @@ final class PairRDDFunctions[K: ClassTag, V: ClassTag] protected[rdd] (
     val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(self.lastVertex, reduceByKeyVertex),
       self.lastVertex, reduceByKeyVertex)
     newEdge.setProperty(
-      CoderProperty.of(new SparkCoder[Tuple2[K, V]](self.serializer))
+      EncoderProperty.of(new SparkEncoderFactory[Tuple2[K, V]](self.serializer))
+        .asInstanceOf[EdgeExecutionProperty[_ <: Serializable]])
+    newEdge.setProperty(
+      DecoderProperty.of(new SparkDecoderFactory[Tuple2[K, V]](self.serializer))
         .asInstanceOf[EdgeExecutionProperty[_ <: Serializable]])
     newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor))
     builder.connectVertices(newEdge)
diff --git a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala
index 135f6980..ad583999 100644
--- a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala
+++ b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala
@@ -20,11 +20,11 @@ import java.util
 import edu.snu.nemo.client.JobLauncher
 import edu.snu.nemo.common.dag.{DAG, DAGBuilder}
 import edu.snu.nemo.common.ir.edge.IREdge
-import edu.snu.nemo.common.ir.edge.executionproperty.{CoderProperty, KeyExtractorProperty}
+import edu.snu.nemo.common.ir.edge.executionproperty.{DecoderProperty, EncoderProperty, KeyExtractorProperty}
 import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty
 import edu.snu.nemo.common.ir.vertex.{IRVertex, LoopVertex, OperatorVertex}
 import edu.snu.nemo.compiler.frontend.spark.SparkKeyExtractor
-import edu.snu.nemo.compiler.frontend.spark.coder.SparkCoder
+import edu.snu.nemo.compiler.frontend.spark.coder.{SparkDecoderFactory, SparkEncoderFactory}
 import edu.snu.nemo.compiler.frontend.spark.core.SparkFrontendUtils
 import edu.snu.nemo.compiler.frontend.spark.transform._
 import org.apache.hadoop.io.WritableFactory
@@ -51,8 +51,10 @@ final class RDD[T: ClassTag] protected[rdd] (
 
   protected[rdd] val serializer: Serializer = SparkFrontendUtils.deriveSerializerFrom(_sc)
   private val loopVertexStack = new util.Stack[LoopVertex]
-  private val coderProperty: EdgeExecutionProperty[_ <: Serializable] =
-    CoderProperty.of(new SparkCoder[T](serializer)).asInstanceOf[EdgeExecutionProperty[_ <: Serializable]]
+  private val encoderProperty: EdgeExecutionProperty[_ <: Serializable] =
+    EncoderProperty.of(new SparkEncoderFactory[T](serializer)).asInstanceOf[EdgeExecutionProperty[_ <: Serializable]]
+  private val decoderProperty: EdgeExecutionProperty[_ <: Serializable] =
+    DecoderProperty.of(new SparkDecoderFactory[T](serializer)).asInstanceOf[EdgeExecutionProperty[_ <: Serializable]]
   private val keyExtractorProperty: KeyExtractorProperty = KeyExtractorProperty.of(new SparkKeyExtractor)
 
   /**
@@ -137,7 +139,8 @@ final class RDD[T: ClassTag] protected[rdd] (
 
     val newEdge: IREdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, mapVertex),
       lastVertex, mapVertex)
-    newEdge.setProperty(coderProperty)
+    newEdge.setProperty(encoderProperty)
+    newEdge.setProperty(decoderProperty)
     newEdge.setProperty(keyExtractorProperty)
     builder.connectVertices(newEdge)
 
@@ -156,7 +159,8 @@ final class RDD[T: ClassTag] protected[rdd] (
 
     val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, flatMapVertex),
       lastVertex, flatMapVertex)
-    newEdge.setProperty(coderProperty)
+    newEdge.setProperty(encoderProperty)
+    newEdge.setProperty(decoderProperty)
     newEdge.setProperty(keyExtractorProperty)
     builder.connectVertices(newEdge)
 
@@ -186,7 +190,8 @@ final class RDD[T: ClassTag] protected[rdd] (
 
     val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, reduceVertex),
       lastVertex, reduceVertex)
-    newEdge.setProperty(coderProperty)
+    newEdge.setProperty(encoderProperty)
+    newEdge.setProperty(decoderProperty)
     newEdge.setProperty(keyExtractorProperty)
 
     builder.connectVertices(newEdge)
@@ -210,7 +215,8 @@ final class RDD[T: ClassTag] protected[rdd] (
     builder.addVertex(flatMapVertex, loopVertexStack)
     val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, flatMapVertex),
       lastVertex, flatMapVertex)
-    newEdge.setProperty(coderProperty)
+    newEdge.setProperty(encoderProperty)
+    newEdge.setProperty(decoderProperty)
     newEdge.setProperty(keyExtractorProperty)
 
     builder.connectVertices(newEdge)
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java
index 1d1a651d..0ffddaa6 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java
@@ -50,6 +50,7 @@ public CompressionPass(final CompressionProperty.Value compression) {
     dag.topologicalDo(vertex -> dag.getIncomingEdgesOf(vertex).stream()
         .filter(e -> !vertex.getPropertyValue(StageIdProperty.class).get()
             .equals(e.getSrc().getPropertyValue(StageIdProperty.class).get()))
+        .filter(edge -> !edge.getPropertyValue(CompressionProperty.class).isPresent())
         .forEach(edge -> edge.setProperty(CompressionProperty.of(compression))));
 
     return dag;
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DecompressionPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DecompressionPass.java
new file mode 100644
index 00000000..6ec2fde8
--- /dev/null
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DecompressionPass.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecompressionProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
+
+
+/**
+ * A pass for applying decompression algorithm for data flowing between vertices.
+ * It always
+ */
+public final class DecompressionPass extends AnnotatingPass {
+
+  /**
+   * Constructor.
+   */
+  public DecompressionPass() {
+    super(CompressionProperty.class);
+  }
+
+  @Override
+  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+    dag.topologicalDo(vertex -> dag.getIncomingEdgesOf(vertex).stream()
+        .filter(e -> !vertex.getPropertyValue(StageIdProperty.class).get()
+            .equals(e.getSrc().getPropertyValue(StageIdProperty.class).get()))
+        // Find edges which have a compression property but not decompression property.
+        .filter(edge -> edge.getPropertyValue(CompressionProperty.class).isPresent()
+            && !edge.getPropertyValue(DecompressionProperty.class).isPresent())
+        .forEach(edge -> edge.setProperty(DecompressionProperty.of(
+            edge.getPropertyValue(CompressionProperty.class).get()))));
+
+    return dag;
+  }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeDecoderPass.java
similarity index 64%
rename from compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPass.java
rename to compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeDecoderPass.java
index eade93d1..2283bfb5 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeDecoderPass.java
@@ -15,32 +15,33 @@
  */
 package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
 
-import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.common.coder.DecoderFactory;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 
 /**
- * Pass for initiating IREdge Coder ExecutionProperty with default dummy coder.
+ * Pass for initiating IREdge Decoder ExecutionProperty with default dummy coder.
  */
-public final class DefaultEdgeCoderPass extends AnnotatingPass {
+public final class DefaultEdgeDecoderPass extends AnnotatingPass {
 
-  private static final CoderProperty DEFAULT_CODER_PROPERTY = CoderProperty.of(Coder.DUMMY_CODER);
+  private static final DecoderProperty DEFAULT_DECODER_PROPERTY =
+      DecoderProperty.of(DecoderFactory.DUMMY_DECODER_FACTORY);
 
   /**
    * Default constructor.
    */
-  public DefaultEdgeCoderPass() {
-    super(CoderProperty.class);
+  public DefaultEdgeDecoderPass() {
+    super(DecoderProperty.class);
   }
 
   @Override
   public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
     dag.topologicalDo(irVertex ->
         dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
-          if (!irEdge.getPropertyValue(CoderProperty.class).isPresent()) {
-            irEdge.setProperty(DEFAULT_CODER_PROPERTY);
+          if (!irEdge.getPropertyValue(DecoderProperty.class).isPresent()) {
+            irEdge.setProperty(DEFAULT_DECODER_PROPERTY);
           }
         }));
     return dag;
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeEncoderPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeEncoderPass.java
new file mode 100644
index 00000000..0238c72b
--- /dev/null
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeEncoderPass.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.coder.EncoderFactory;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+
+/**
+ * Pass for initiating IREdge Encoder ExecutionProperty with default dummy coder.
+ */
+public final class DefaultEdgeEncoderPass extends AnnotatingPass {
+
+  private static final EncoderProperty DEFAULT_DECODER_PROPERTY =
+      EncoderProperty.of(EncoderFactory.DUMMY_ENCODER_FACTORY);
+
+  /**
+   * Default constructor.
+   */
+  public DefaultEdgeEncoderPass() {
+    super(EncoderProperty.class);
+  }
+
+  @Override
+  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+    dag.topologicalDo(irVertex ->
+        dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
+          if (!irEdge.getPropertyValue(EncoderProperty.class).isPresent()) {
+            irEdge.setProperty(DEFAULT_DECODER_PROPERTY);
+          }
+        }));
+    return dag;
+  }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
index adf862a4..b0ab2b17 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
@@ -31,12 +31,14 @@
   public PrimitiveCompositePass() {
     super(Arrays.asList(
         new DefaultParallelismPass(), // annotating after reshaping passes, before stage partitioning
-        new DefaultEdgeCoderPass(),
+        new DefaultEdgeEncoderPass(),
+        new DefaultEdgeDecoderPass(),
         new DefaultStagePartitioningPass(),
         new ReviseInterStageEdgeDataStorePass(), // after stage partitioning
         new DefaultEdgeUsedDataHandlingPass(),
         new ScheduleGroupPass(),
-        new CompressionPass()
+        new CompressionPass(),
+        new DecompressionPass()
     ));
   }
 }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java
index 64b1060f..eef0b203 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java
@@ -15,9 +15,11 @@
  */
 package edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping;
 
-import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.common.coder.DecoderFactory;
+import edu.snu.nemo.common.coder.EncoderFactory;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
@@ -150,9 +152,13 @@ private static void mergeAndAddToBuilder(final List<OperatorVertex> ovs, final D
               outListToModify.remove(e);
               final IREdge newIrEdge = new IREdge(e.getPropertyValue(DataCommunicationPatternProperty.class).get(),
                   operatorVertexToUse, e.getDst());
-              final Optional<Coder> coderProperty = e.getPropertyValue(CoderProperty.class);
-              if (coderProperty.isPresent()) {
-                newIrEdge.setProperty(CoderProperty.of(coderProperty.get()));
+              final Optional<EncoderFactory> encoderProperty = e.getPropertyValue(EncoderProperty.class);
+              if (encoderProperty.isPresent()) {
+                newIrEdge.setProperty(EncoderProperty.of(encoderProperty.get()));
+              }
+              final Optional<DecoderFactory> decoderProperty = e.getPropertyValue(DecoderProperty.class);
+              if (decoderProperty.isPresent()) {
+                newIrEdge.setProperty(DecoderProperty.of(decoderProperty.get()));
               }
               outListToModify.add(newIrEdge);
             });
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java
index 41392643..1117a7cc 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java
@@ -18,7 +18,8 @@
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
@@ -66,7 +67,8 @@ public DataSkewReshapingPass() {
             // We then insert the dynamicOptimizationVertex between the vertex and incoming vertices.
             final IREdge newEdge = new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
                 edge.getSrc(), metricCollectionBarrierVertex);
-            newEdge.setProperty(CoderProperty.of(edge.getPropertyValue(CoderProperty.class).get()));
+            newEdge.setProperty(EncoderProperty.of(edge.getPropertyValue(EncoderProperty.class).get()));
+            newEdge.setProperty(DecoderProperty.of(edge.getPropertyValue(DecoderProperty.class).get()));
 
             final IREdge edgeToGbK = new IREdge(edge.getPropertyValue(DataCommunicationPatternProperty.class).get(),
                 metricCollectionBarrierVertex, v, edge.isSideInput());
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
index 10d76e1a..072c1637 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
@@ -17,7 +17,8 @@
 
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.LoopVertex;
 import edu.snu.nemo.common.dag.DAG;
@@ -288,7 +289,8 @@ public LoopInvariantCodeMotionPass() {
                 edgesToRemove.add(edge);
                 final IREdge newEdge = new IREdge(edge.getPropertyValue(DataCommunicationPatternProperty.class).get(),
                     candidate.getKey(), edge.getDst(), edge.isSideInput());
-                newEdge.setProperty(CoderProperty.of(edge.getPropertyValue(CoderProperty.class).get()));
+                newEdge.setProperty(EncoderProperty.of(edge.getPropertyValue(EncoderProperty.class).get()));
+                newEdge.setProperty(DecoderProperty.of(edge.getPropertyValue(DecoderProperty.class).get()));
                 edgesToAdd.add(newEdge);
               });
           final List<IREdge> listToModify = inEdges.getOrDefault(loopVertex, new ArrayList<>());
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
index e899da97..9bf434a7 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
@@ -17,7 +17,8 @@
 
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
-import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
@@ -63,7 +64,8 @@ public SailfishRelayReshapingPass() {
             edge.copyExecutionPropertiesTo(newEdgeToMerger);
             final IREdge newEdgeFromMerger = new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
                 iFileMergerVertex, v);
-            newEdgeFromMerger.setProperty(CoderProperty.of(edge.getPropertyValue(CoderProperty.class).get()));
+            newEdgeFromMerger.setProperty(EncoderProperty.of(edge.getPropertyValue(EncoderProperty.class).get()));
+            newEdgeFromMerger.setProperty(DecoderProperty.of(edge.getPropertyValue(DecoderProperty.class).get()));
             builder.connectVertices(newEdgeToMerger);
             builder.connectVertices(newEdgeFromMerger);
           } else {
diff --git a/examples/spark/src/main/java/edu/snu/nemo/examples/spark/sql/JavaUserDefinedTypedAggregation.java b/examples/spark/src/main/java/edu/snu/nemo/examples/spark/sql/JavaUserDefinedTypedAggregation.java
index 00d2cb35..c6bec6a5 100644
--- a/examples/spark/src/main/java/edu/snu/nemo/examples/spark/sql/JavaUserDefinedTypedAggregation.java
+++ b/examples/spark/src/main/java/edu/snu/nemo/examples/spark/sql/JavaUserDefinedTypedAggregation.java
@@ -193,7 +193,7 @@ public Double finish(final Average reduction) {
     }
 
     /**
-     * Specifies the Encoder for the intermediate value type.
+     * Specifies the EncoderFactory for the intermediate value type.
      *
      * @return buffer encoder.
      */
@@ -202,7 +202,7 @@ public Double finish(final Average reduction) {
     }
 
     /**
-     * Specifies the Encoder for the final output value type.
+     * Specifies the EncoderFactory for the final output value type.
      *
      * @return output encoder.
      */
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
index c9d3cc2d..69f5a173 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
@@ -17,7 +17,9 @@
 
 import com.google.protobuf.ByteString;
 import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecompressionProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.conf.JobConf;
@@ -108,15 +110,21 @@ private void launchTask(final Task task) {
           new TaskStateManager(task, executorId, persistentConnectionToMasterMap, metricMessageSender);
 
       task.getTaskIncomingEdges().forEach(e -> serializerManager.register(e.getId(),
-          e.getPropertyValue(CoderProperty.class).get(), e.getPropertyValue(CompressionProperty.class)
-              .orElse(null)));
+          e.getPropertyValue(EncoderProperty.class).get(),
+          e.getPropertyValue(DecoderProperty.class).get(),
+          e.getPropertyValue(CompressionProperty.class).orElse(null),
+          e.getPropertyValue(DecompressionProperty.class).orElse(null)));
       task.getTaskOutgoingEdges().forEach(e -> serializerManager.register(e.getId(),
-          e.getPropertyValue(CoderProperty.class).get(), e.getPropertyValue(CompressionProperty.class).
-              orElse(null)));
+          e.getPropertyValue(EncoderProperty.class).get(),
+          e.getPropertyValue(DecoderProperty.class).get(),
+          e.getPropertyValue(CompressionProperty.class).orElse(null),
+          e.getPropertyValue(DecompressionProperty.class).orElse(null)));
       irDag.getVertices().forEach(v -> {
         irDag.getOutgoingEdgesOf(v).forEach(e -> serializerManager.register(e.getId(),
-            e.getPropertyValue(CoderProperty.class).get(), e.getPropertyValue(CompressionProperty.class)
-                .orElse(null)));
+            e.getPropertyValue(EncoderProperty.class).get(),
+            e.getPropertyValue(DecoderProperty.class).get(),
+            e.getPropertyValue(CompressionProperty.class).orElse(null),
+            e.getPropertyValue(DecompressionProperty.class).orElse(null)));
       });
 
       new TaskExecutor(
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
index ee0aa46b..69eb9bab 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
@@ -17,10 +17,12 @@
 
 import com.google.common.io.CountingInputStream;
 import edu.snu.nemo.common.DirectByteArrayOutputStream;
-import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.common.coder.DecoderFactory;
+import edu.snu.nemo.common.coder.EncoderFactory;
 import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
 import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition;
-import edu.snu.nemo.runtime.executor.data.streamchainer.StreamChainer;
+import edu.snu.nemo.runtime.executor.data.streamchainer.DecodeStreamChainer;
+import edu.snu.nemo.runtime.executor.data.streamchainer.EncodeStreamChainer;
 import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,18 +49,19 @@ private DataUtil() {
   /**
    * Serializes the elements in a non-serialized partition into an output stream.
    *
-   * @param coder                  the coder to encode the elements.
+   * @param encoderFactory                the encoderFactory to encode the elements.
    * @param nonSerializedPartition the non-serialized partition to serialize.
    * @param bytesOutputStream      the output stream to write.
    * @return total number of elements in the partition.
    * @throws IOException if fail to serialize.
    */
-  public static long serializePartition(final Coder coder,
+  public static long serializePartition(final EncoderFactory encoderFactory,
                                         final NonSerializedPartition nonSerializedPartition,
                                         final OutputStream bytesOutputStream) throws IOException {
     long elementsCount = 0;
+    final EncoderFactory.Encoder encoder = encoderFactory.create(bytesOutputStream);
     for (final Object element : nonSerializedPartition.getData()) {
-      coder.encode(element, bytesOutputStream);
+      encoder.encode(element);
       elementsCount++;
     }
 
@@ -77,9 +80,10 @@ public static long serializePartition(final Coder coder,
    * @throws IOException if fail to deserialize.
    */
   public static <K extends Serializable> NonSerializedPartition deserializePartition(final long elementsInPartition,
-                                                            final Serializer serializer,
-                                                            final K key,
-                                                            final InputStream inputStream) throws IOException {
+                                                                                     final Serializer serializer,
+                                                                                     final K key,
+                                                                                     final InputStream inputStream)
+      throws IOException {
     final List deserializedData = new ArrayList();
     final InputStreamIterator iterator = new InputStreamIterator(Collections.singletonList(inputStream).iterator(),
         serializer, elementsInPartition);
@@ -105,9 +109,10 @@ public static long serializePartition(final Coder coder,
     for (final NonSerializedPartition<K> partitionToConvert : partitionsToConvert) {
       try (
           final DirectByteArrayOutputStream bytesOutputStream = new DirectByteArrayOutputStream();
-          final OutputStream wrappedStream = buildOutputStream(bytesOutputStream, serializer.getStreamChainers());
+          final OutputStream wrappedStream = buildOutputStream(bytesOutputStream, serializer.getEncodeStreamChainers());
       ) {
-        final long elementsTotal = serializePartition(serializer.getCoder(), partitionToConvert, wrappedStream);
+        final long elementsTotal =
+            serializePartition(serializer.getEncoderFactory(), partitionToConvert, wrappedStream);
         // We need to close wrappedStream on here, because DirectByteArrayOutputStream:getBufDirectly() returns
         // inner buffer directly, which can be an unfinished(not flushed) buffer.
         wrappedStream.close();
@@ -190,13 +195,14 @@ public static Iterable concatNonSerPartitions(final Iterable<NonSerializedPartit
   }
 
   /**
-   * An iterator that emits objects from {@link InputStream} using the corresponding {@link Coder}.
+   * An iterator that emits objects from {@link InputStream} using the corresponding {@link DecoderFactory}.
+   *
    * @param <T> The type of elements.
    */
   public static final class InputStreamIterator<T> implements IteratorWithNumBytes<T> {
 
     private final Iterator<InputStream> inputStreams;
-    private final Serializer<T> serializer;
+    private final Serializer<?, T> serializer;
     private final long limit;
 
     private volatile CountingInputStream serializedCountingStream = null;
@@ -204,17 +210,19 @@ public static Iterable concatNonSerPartitions(final Iterable<NonSerializedPartit
     private volatile boolean hasNext = false;
     private volatile T next;
     private volatile boolean cannotContinueDecoding = false;
+    private volatile DecoderFactory.Decoder<T> decoder = null;
     private volatile long elementsDecoded = 0;
     private volatile long numSerializedBytes = 0;
     private volatile long numEncodedBytes = 0;
 
     /**
-     * Construct {@link Iterator} from {@link InputStream} and {@link Coder}.
+     * Construct {@link Iterator} from {@link InputStream} and {@link DecoderFactory}.
      *
      * @param inputStreams The streams to read data from.
      * @param serializer   The serializer.
      */
-    public InputStreamIterator(final Iterator<InputStream> inputStreams, final Serializer<T> serializer) {
+    public InputStreamIterator(final Iterator<InputStream> inputStreams,
+                               final Serializer<?, T> serializer) {
       this.inputStreams = inputStreams;
       this.serializer = serializer;
       // -1 means no limit.
@@ -222,7 +230,7 @@ public InputStreamIterator(final Iterator<InputStream> inputStreams, final Seria
     }
 
     /**
-     * Construct {@link Iterator} from {@link InputStream} and {@link Coder}.
+     * Construct {@link Iterator} from {@link InputStream} and {@link DecoderFactory}.
      *
      * @param inputStreams The streams to read data from.
      * @param serializer   The serializer.
@@ -230,7 +238,7 @@ public InputStreamIterator(final Iterator<InputStream> inputStreams, final Seria
      */
     public InputStreamIterator(
         final Iterator<InputStream> inputStreams,
-        final Serializer<T> serializer,
+        final Serializer<?, T> serializer,
         final long limit) {
       if (limit < 0) {
         throw new IllegalArgumentException("Negative limit not allowed.");
@@ -254,11 +262,12 @@ public boolean hasNext() {
       }
       while (true) {
         try {
-          if (encodedCountingStream == null) {
+          if (decoder == null) {
             if (inputStreams.hasNext()) {
               serializedCountingStream = new CountingInputStream(inputStreams.next());
               encodedCountingStream = new CountingInputStream(buildInputStream(
-                  serializedCountingStream, serializer.getStreamChainers()));
+                  serializedCountingStream, serializer.getDecodeStreamChainers()));
+              decoder = serializer.getDecoderFactory().create(encodedCountingStream);
             } else {
               cannotContinueDecoding = true;
               return false;
@@ -269,7 +278,7 @@ public boolean hasNext() {
           throw new RuntimeException(e);
         }
         try {
-          next = serializer.getCoder().decode(encodedCountingStream);
+          next = decoder.decode();
           hasNext = true;
           elementsDecoded++;
           return true;
@@ -279,6 +288,7 @@ public boolean hasNext() {
           numEncodedBytes += encodedCountingStream.getCount();
           serializedCountingStream = null;
           encodedCountingStream = null;
+          decoder = null;
         }
       }
     }
@@ -313,50 +323,54 @@ public long getNumEncodedBytes() {
   }
 
   /**
-   * Chain {@link InputStream} with {@link StreamChainer}s.
+   * Chain {@link InputStream} with {@link DecodeStreamChainer}s.
    *
-   * @param in             the {@link InputStream}.
-   * @param streamChainers the list of {@link StreamChainer} to be applied on the stream.
-   * @return chained       {@link InputStream}.
-   * @throws IOException   if fail to create new stream.
+   * @param in                   the {@link InputStream}.
+   * @param decodeStreamChainers the list of {@link DecodeStreamChainer} to be applied on the stream.
+   * @return chained {@link InputStream}.
+   * @throws IOException if fail to create new stream.
    */
-  public static InputStream buildInputStream(final InputStream in, final List<StreamChainer> streamChainers)
-  throws IOException {
+  public static InputStream buildInputStream(final InputStream in,
+                                             final List<DecodeStreamChainer> decodeStreamChainers)
+      throws IOException {
     InputStream chained = in;
-    for (final StreamChainer streamChainer : streamChainers) {
-      chained = streamChainer.chainInput(chained);
+    for (final DecodeStreamChainer encodeStreamChainer : decodeStreamChainers) {
+      chained = encodeStreamChainer.chainInput(chained);
     }
     return chained;
   }
 
   /**
-   * Chain {@link OutputStream} with {@link StreamChainer}s.
+   * Chain {@link OutputStream} with {@link EncodeStreamChainer}s.
    *
-   * @param out            the {@link OutputStream}.
-   * @param streamChainers the list of {@link StreamChainer} to be applied on the stream.
-   * @return chained       {@link OutputStream}.
-   * @throws IOException   if fail to create new stream.
+   * @param out                  the {@link OutputStream}.
+   * @param encodeStreamChainers the list of {@link EncodeStreamChainer} to be applied on the stream.
+   * @return chained {@link OutputStream}.
+   * @throws IOException if fail to create new stream.
    */
-  public static OutputStream buildOutputStream(final OutputStream out, final List<StreamChainer> streamChainers)
-  throws IOException {
+  public static OutputStream buildOutputStream(final OutputStream out,
+                                               final List<EncodeStreamChainer> encodeStreamChainers)
+      throws IOException {
     OutputStream chained = out;
-    final List<StreamChainer> temporaryStreamChainerList = new ArrayList<>(streamChainers);
-    Collections.reverse(temporaryStreamChainerList);
-    for (final StreamChainer streamChainer : temporaryStreamChainerList) {
-      chained = streamChainer.chainOutput(chained);
+    final List<EncodeStreamChainer> temporaryEncodeStreamChainerList = new ArrayList<>(encodeStreamChainers);
+    Collections.reverse(temporaryEncodeStreamChainerList);
+    for (final EncodeStreamChainer encodeStreamChainer : temporaryEncodeStreamChainerList) {
+      chained = encodeStreamChainer.chainOutput(chained);
     }
     return chained;
   }
 
   /**
    * {@link Iterator} with interface to access to the number of bytes.
+   *
    * @param <T> the type of decoded object
    */
   public interface IteratorWithNumBytes<T> extends Iterator<T> {
     /**
      * Create an {@link IteratorWithNumBytes}, with no information about the number of bytes.
+     *
      * @param innerIterator {@link Iterator} to wrap
-     * @param <E> the type of decoded object
+     * @param <E>           the type of decoded object
      * @return an {@link IteratorWithNumBytes}, with no information about the number of bytes
      */
     static <E> IteratorWithNumBytes<E> of(final Iterator<E> innerIterator) {
@@ -385,10 +399,11 @@ public E next() {
 
     /**
      * Create an {@link IteratorWithNumBytes}, with the number of bytes in decoded and serialized form.
-     * @param innerIterator {@link Iterator} to wrap
+     *
+     * @param innerIterator      {@link Iterator} to wrap
      * @param numSerializedBytes the number of bytes in serialized form
-     * @param numEncodedBytes the number of bytes in encoded form
-     * @param <E> the type of decoded object
+     * @param numEncodedBytes    the number of bytes in encoded form
+     * @param <E>                the type of decoded object
      * @return an {@link IteratorWithNumBytes}, with the information about the number of bytes
      */
     static <E> IteratorWithNumBytes<E> of(final Iterator<E> innerIterator,
@@ -432,6 +447,7 @@ public NumBytesNotSupportedException() {
     /**
      * This method should be called after the actual data is taken out of iterator,
      * since the existence of an iterator does not guarantee that data inside it is ready.
+     *
      * @return the number of bytes in serialized form (which is, for example, encoded and compressed)
      * @throws NumBytesNotSupportedException when the operation is not supported
      * @throws IllegalStateException         when the information is not ready
@@ -441,6 +457,7 @@ public NumBytesNotSupportedException() {
     /**
      * This method should be called after the actual data is taken out of iterator,
      * since the existence of an iterator does not guarantee that data inside it is ready.
+     *
      * @return the number of bytes in encoded form (which is ready to be decoded)
      * @throws NumBytesNotSupportedException when the operation is not supported
      * @throws IllegalStateException         when the information is not ready
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/SerializerManager.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/SerializerManager.java
index 46c46092..5ea1beca 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/SerializerManager.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/SerializerManager.java
@@ -15,23 +15,22 @@
  */
 package edu.snu.nemo.runtime.executor.data;
 
-import edu.snu.nemo.runtime.executor.data.streamchainer.CompressionStreamChainer;
-import edu.snu.nemo.runtime.executor.data.streamchainer.StreamChainer;
-import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.common.coder.DecoderFactory;
+import edu.snu.nemo.common.coder.EncoderFactory;
+import edu.snu.nemo.runtime.executor.data.streamchainer.*;
 import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
-import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import javax.inject.Inject;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 /**
- * Mapping from RuntimeEdgeId to Coder.
+ * Mapping from RuntimeEdgeId to {@link Serializer}.
  */
 public final class SerializerManager {
   private static final Logger LOG = LoggerFactory.getLogger(SerializerManager.class.getName());
@@ -45,40 +44,53 @@ public SerializerManager() {
   }
 
   /**
-   * Register a coder for runtime edge.
+   * Register a encoderFactory for runtime edge.
+   * This method regards that compression & decompression property are empty.
    *
-   * @param runtimeEdgeId id of the runtime edge.
-   * @param coder         the corresponding coder.
+   * @param runtimeEdgeId  id of the runtime edge.
+   * @param encoderFactory the corresponding encoder factory.
+   * @param decoderFactory the corresponding decoder factory.
    */
   public void register(final String runtimeEdgeId,
-                       final Coder coder) {
-    register(runtimeEdgeId, coder, null);
+                       final EncoderFactory encoderFactory,
+                       final DecoderFactory decoderFactory) {
+    register(runtimeEdgeId, encoderFactory, decoderFactory, null, null);
   }
 
   /**
-   * Register a coder for runtime edge.
+   * Register a encoderFactory for runtime edge.
    *
-   * @param runtimeEdgeId id of the runtime edge.
-   * @param coder         the corresponding coder.
+   * @param runtimeEdgeId         id of the runtime edge.
+   * @param encoderFactory        the corresponding encoder factory.
+   * @param decoderFactory        the corresponding decoder factory.
    * @param compressionProperty   compression property, or null not to enable compression
+   * @param decompressionProperty decompression property, or null not to enable decompression
    */
   public void register(final String runtimeEdgeId,
-                       final Coder coder,
-                       final CompressionProperty.Value compressionProperty) {
+                       final EncoderFactory encoderFactory,
+                       final DecoderFactory decoderFactory,
+                       @Nullable final CompressionProperty.Value compressionProperty,
+                       @Nullable final CompressionProperty.Value decompressionProperty) {
     LOG.debug("{} edge id registering to SerializerManager", runtimeEdgeId);
-    final Serializer serializer = new Serializer(coder, Collections.emptyList());
-    runtimeEdgeIdToSerializer.putIfAbsent(runtimeEdgeId, serializer);
 
-    final List<StreamChainer> streamChainerList = new ArrayList<>();
+    final List<EncodeStreamChainer> encodeStreamChainers = new ArrayList<>();
+    final List<DecodeStreamChainer> decodeStreamChainers = new ArrayList<>();
 
     // Compression chain
     if (compressionProperty != null) {
       LOG.debug("Adding {} compression chain for {}",
           compressionProperty, runtimeEdgeId);
-      streamChainerList.add(new CompressionStreamChainer(compressionProperty));
+      encodeStreamChainers.add(new CompressionStreamChainer(compressionProperty));
+    }
+    if (decompressionProperty != null) {
+      LOG.debug("Adding {} decompression chain for {}",
+          decompressionProperty, runtimeEdgeId);
+      decodeStreamChainers.add(new DecompressionStreamChainer(decompressionProperty));
     }
 
-    serializer.setStreamChainers(streamChainerList);
+    final Serializer serializer =
+        new Serializer(encoderFactory, decoderFactory, encodeStreamChainers, decodeStreamChainers);
+    runtimeEdgeIdToSerializer.putIfAbsent(runtimeEdgeId, serializer);
   }
 
   /**
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java
index 200e465d..d6d63d1e 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.executor.data.partition;
 
 import edu.snu.nemo.common.DirectByteArrayOutputStream;
-import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.common.coder.EncoderFactory;
 import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
 
 import javax.annotation.Nullable;
@@ -39,7 +39,7 @@
   // Will be null when the partition is committed when it is constructed.
   @Nullable private final DirectByteArrayOutputStream bytesOutputStream;
   @Nullable private final OutputStream wrappedStream;
-  @Nullable private final Coder coder;
+  @Nullable private final EncoderFactory.Encoder encoder;
 
   /**
    * Creates a serialized {@link Partition} without actual data.
@@ -57,8 +57,8 @@ public SerializedPartition(final K key,
     this.length = 0;
     this.committed = false;
     this.bytesOutputStream = new DirectByteArrayOutputStream();
-    this.wrappedStream = buildOutputStream(bytesOutputStream, serializer.getStreamChainers());
-    this.coder = serializer.getCoder();
+    this.wrappedStream = buildOutputStream(bytesOutputStream, serializer.getEncodeStreamChainers());
+    this.encoder = serializer.getEncoderFactory().create(wrappedStream);
   }
 
   /**
@@ -81,7 +81,7 @@ public SerializedPartition(final K key,
     this.committed = true;
     this.bytesOutputStream = null;
     this.wrappedStream = null;
-    this.coder = null;
+    this.encoder = null;
   }
 
   /**
@@ -96,7 +96,7 @@ public void write(final Object element) throws IOException {
       throw new IOException("The partition is already committed!");
     } else {
       try {
-        coder.encode(element, wrappedStream);
+        encoder.encode(element);
         elementsCount++;
       } catch (final IOException e) {
         wrappedStream.close();
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java
index c5c2e927..467ba181 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java
@@ -17,19 +17,16 @@
 
 import edu.snu.nemo.common.exception.UnsupportedCompressionException;
 import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
-import net.jpountz.lz4.LZ4BlockInputStream;
 import net.jpountz.lz4.LZ4BlockOutputStream;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
 /**
- * {@link StreamChainer} for applying compression.
+ * {@link EncodeStreamChainer} for applying compression.
  */
-public class CompressionStreamChainer implements StreamChainer {
+public class CompressionStreamChainer implements EncodeStreamChainer {
   private final CompressionProperty.Value compression;
 
   /**
@@ -41,18 +38,6 @@ public CompressionStreamChainer(final CompressionProperty.Value compression) {
     this.compression = compression;
   }
 
-  @Override
-  public final InputStream chainInput(final InputStream in) throws IOException {
-    switch (compression) {
-      case Gzip:
-        return new GZIPInputStream(in);
-      case LZ4:
-        return new LZ4BlockInputStream(in);
-      default:
-        throw new UnsupportedCompressionException("Not supported compression method");
-    }
-  }
-
   @Override
   public final OutputStream chainOutput(final OutputStream out) throws IOException {
     switch (compression) {
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecodeStreamChainer.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecodeStreamChainer.java
new file mode 100644
index 00000000..6084b6e3
--- /dev/null
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecodeStreamChainer.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.executor.data.streamchainer;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A {@link DecodeStreamChainer} object indicates each stream manipulation strategy.
+ * Stream can be chained by {@link DecodeStreamChainer} multiple times.
+ */
+public interface DecodeStreamChainer {
+
+  /**
+   * Chain {@link InputStream} and returns chained {@link InputStream}.
+   *
+   * @param in the stream which will be chained.
+   * @return chained {@link InputStream}.
+   * @throws IOException if fail to chain the stream.
+   */
+  InputStream chainInput(InputStream in) throws IOException;
+}
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecompressionStreamChainer.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecompressionStreamChainer.java
new file mode 100644
index 00000000..558bd357
--- /dev/null
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/DecompressionStreamChainer.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.executor.data.streamchainer;
+
+import edu.snu.nemo.common.exception.UnsupportedCompressionException;
+import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
+import net.jpountz.lz4.LZ4BlockInputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.GZIPInputStream;
+
+/**
+ * {@link DecodeStreamChainer} for applying compression.
+ */
+public class DecompressionStreamChainer implements DecodeStreamChainer {
+  private final CompressionProperty.Value compression;
+
+  /**
+   * Constructor.
+   *
+   * @param compression compression method.
+   */
+  public DecompressionStreamChainer(final CompressionProperty.Value compression) {
+    this.compression = compression;
+  }
+
+  @Override
+  public final InputStream chainInput(final InputStream in) throws IOException {
+    switch (compression) {
+      case Gzip:
+        return new GZIPInputStream(in);
+      case LZ4:
+        return new LZ4BlockInputStream(in);
+      default:
+        throw new UnsupportedCompressionException("Not supported compression method");
+    }
+  }
+}
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/StreamChainer.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/EncodeStreamChainer.java
similarity index 67%
rename from runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/StreamChainer.java
rename to runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/EncodeStreamChainer.java
index 2dbeec73..b0c75479 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/StreamChainer.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/EncodeStreamChainer.java
@@ -16,22 +16,13 @@
 package edu.snu.nemo.runtime.executor.data.streamchainer;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 
 /**
- * A {@link StreamChainer} object indicates each stream manipulation strategy.
- * Stream can be chained by {@link StreamChainer} multiple times.
+ * A {@link EncodeStreamChainer} object indicates each stream manipulation strategy.
+ * Stream can be chained by {@link EncodeStreamChainer} multiple times.
  */
-public interface StreamChainer {
-  /**
-   * Chain {@link InputStream} and returns chained {@link InputStream}.
-   *
-   * @param in the stream which will be chained.
-   * @return chained {@link InputStream}.
-   * @throws IOException if fail to chain the stream.
-   */
-  InputStream chainInput(InputStream in) throws IOException;
+public interface EncodeStreamChainer {
 
   /**
    * Chain {@link OutputStream} and returns chained {@link OutputStream}.
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/Serializer.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/Serializer.java
index d79d28f4..e30147a0 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/Serializer.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/Serializer.java
@@ -15,53 +15,65 @@
  */
 package edu.snu.nemo.runtime.executor.data.streamchainer;
 
-import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.common.coder.DecoderFactory;
+import edu.snu.nemo.common.coder.EncoderFactory;
 
 import java.util.List;
 
 /**
- * class that contains {@link Coder} and {@link List} of {@link StreamChainer}.
- * @param <T> coder element type.
+ * class that contains {@link EncoderFactory}, {@link DecoderFactory} and {@link List} of {@link EncodeStreamChainer}.
+ * @param <E> encoderFactory element type.
+ * @param <D> decoderFactory element type.
  */
-public final class Serializer<T> {
-  private Coder<T> coder;
-  private List<StreamChainer> streamChainers;
+public final class Serializer<E, D> {
+  private final EncoderFactory<E> encoderFactory;
+  private final DecoderFactory<D> decoderFactory;
+  private final List<EncodeStreamChainer> encodeStreamChainers;
+  private final List<DecodeStreamChainer> decodeStreamChainers;
 
   /**
    * Constructor.
    *
-   * @param coder      {@link Coder}.
-   * @param streamChainers list of {@link StreamChainer}.
+   * @param encoderFactory              {@link EncoderFactory}.
+   * @param decoderFactory              {@link DecoderFactory}.
+   * @param encodeStreamChainers the list of {@link EncodeStreamChainer} to use for encoding.
+   * @param decodeStreamChainers the list of {@link DecodeStreamChainer} to use for decoding.
    */
-  public Serializer(final Coder<T> coder, final List<StreamChainer> streamChainers) {
-    this.coder = coder;
-    this.streamChainers = streamChainers;
+  public Serializer(final EncoderFactory<E> encoderFactory,
+                    final DecoderFactory<D> decoderFactory,
+                    final List<EncodeStreamChainer> encodeStreamChainers,
+                    final List<DecodeStreamChainer> decodeStreamChainers) {
+    this.encoderFactory = encoderFactory;
+    this.decoderFactory = decoderFactory;
+    this.encodeStreamChainers = encodeStreamChainers;
+    this.decodeStreamChainers = decodeStreamChainers;
   }
 
   /**
-   * method that returns {@link Coder}.
-   *
-   * @return {@link Coder}.
+   * @return the {@link EncoderFactory} to use.
    */
-  public Coder<T> getCoder() {
-    return coder;
+  public EncoderFactory<E> getEncoderFactory() {
+    return encoderFactory;
   }
 
   /**
-   * method that returns list of {@link StreamChainer}.
-   *
-   * @return list of {@link StreamChainer}.
+   * @return the {@link DecoderFactory} to use.
    */
-  public List<StreamChainer> getStreamChainers() {
-    return streamChainers;
+  public DecoderFactory<D> getDecoderFactory() {
+    return decoderFactory;
   }
 
   /**
-   * method that sets list of {@link StreamChainer}.
-   *
-   * @param streamChainers list of {@link StreamChainer}.
+   * @return the list of {@link EncodeStreamChainer} for encoding.
+   */
+  public List<EncodeStreamChainer> getEncodeStreamChainers() {
+    return encodeStreamChainers;
+  }
+
+  /**
+   * @return the list of {@link EncodeStreamChainer} for decoding.
    */
-  public void setStreamChainers(final List<StreamChainer> streamChainers) {
-    this.streamChainers = streamChainers;
+  public List<DecodeStreamChainer> getDecodeStreamChainers() {
+    return decodeStreamChainers;
   }
 }
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
index 74c25048..b9f0c63f 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
@@ -16,11 +16,9 @@
 package edu.snu.nemo.runtime.executor.data;
 
 import edu.snu.nemo.common.Pair;
-import edu.snu.nemo.common.coder.IntCoder;
-import edu.snu.nemo.common.coder.PairCoder;
+import edu.snu.nemo.common.coder.*;
 import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
 import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.data.HashRange;
 import edu.snu.nemo.runtime.common.data.KeyRange;
@@ -30,6 +28,7 @@
 import edu.snu.nemo.runtime.common.state.BlockState;
 import edu.snu.nemo.runtime.executor.data.block.Block;
 import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
+import edu.snu.nemo.runtime.executor.data.streamchainer.DecompressionStreamChainer;
 import edu.snu.nemo.runtime.executor.data.streamchainer.CompressionStreamChainer;
 import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
 import edu.snu.nemo.runtime.executor.data.stores.*;
@@ -71,9 +70,11 @@
 @PrepareForTest({BlockManagerMaster.class, RuntimeMaster.class, SerializerManager.class})
 public final class BlockStoreTest {
   private static final String TMP_FILE_DIRECTORY = "./tmpFiles";
-  private static final Coder CODER = PairCoder.of(IntCoder.of(), IntCoder.of());
-  private static final Serializer SERIALIZER = new Serializer(CODER,
-      Collections.singletonList(new CompressionStreamChainer(CompressionProperty.Value.LZ4)));
+  private static final Serializer SERIALIZER = new Serializer(
+      PairEncoderFactory.of(IntEncoderFactory.of(), IntEncoderFactory.of()),
+      PairDecoderFactory.of(IntDecoderFactory.of(), IntDecoderFactory.of()),
+      Collections.singletonList(new CompressionStreamChainer(CompressionProperty.Value.LZ4)),
+      Collections.singletonList(new DecompressionStreamChainer(CompressionProperty.Value.LZ4)));
   private static final SerializerManager serializerManager = mock(SerializerManager.class);
   private BlockManagerMaster blockManagerMaster;
   private LocalMessageDispatcher messageDispatcher;
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index 012701cf..0b119d86 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -15,8 +15,7 @@
  */
 package edu.snu.nemo.runtime.executor.datatransfer;
 
-import edu.snu.nemo.common.coder.IntCoder;
-import edu.snu.nemo.common.coder.PairCoder;
+import edu.snu.nemo.common.coder.*;
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.*;
@@ -26,7 +25,6 @@
 import edu.snu.nemo.common.test.EmptyComponents;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.common.Pair;
-import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
@@ -43,9 +41,6 @@
 import edu.snu.nemo.runtime.executor.MetricManagerWorker;
 import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
 import edu.snu.nemo.runtime.executor.data.SerializerManager;
-import edu.snu.nemo.runtime.executor.datatransfer.DataTransferFactory;
-import edu.snu.nemo.runtime.executor.datatransfer.InputReader;
-import edu.snu.nemo.runtime.executor.datatransfer.OutputWriter;
 import edu.snu.nemo.runtime.master.MetricMessageHandler;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
 import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
@@ -105,7 +100,9 @@
   private static final int PARALLELISM_TEN = 10;
   private static final String EDGE_PREFIX_TEMPLATE = "Dummy(%d)";
   private static final AtomicInteger TEST_INDEX = new AtomicInteger(0);
-  private static final Coder CODER = PairCoder.of(IntCoder.of(), IntCoder.of());
+  private static final EncoderFactory ENCODER_FACTORY = PairEncoderFactory.of(IntEncoderFactory.of(), IntEncoderFactory.of());
+  private static final DecoderFactory DECODER_FACTORY =
+      PairDecoderFactory.of(IntDecoderFactory.of(), IntDecoderFactory.of());
   private static final Tang TANG = Tang.Factory.getTang();
   private static final int HASH_RANGE_MULTIPLIER = 10;
 
@@ -307,14 +304,14 @@ private void writeAndRead(final BlockManagerWorker sender,
 
     // Edge setup
     final IREdge dummyIREdge = new IREdge(commPattern, srcVertex, dstVertex);
-    dummyIREdge.setProperty(CoderProperty.of(CODER));
     dummyIREdge.setProperty(KeyExtractorProperty.of((element -> element)));
+    dummyIREdge.setProperty(DataCommunicationPatternProperty.of(commPattern));
+    dummyIREdge.setProperty(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner));
+    dummyIREdge.setProperty(DataStoreProperty.of(store));
+    dummyIREdge.setProperty(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Keep));
+    dummyIREdge.setProperty(EncoderProperty.of(ENCODER_FACTORY));
+    dummyIREdge.setProperty(DecoderProperty.of(DECODER_FACTORY));
     final ExecutionPropertyMap edgeProperties = dummyIREdge.getExecutionProperties();
-    edgeProperties.put(DataCommunicationPatternProperty.of(commPattern));
-    edgeProperties.put(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner));
-    edgeProperties.put(DataStoreProperty.of(store));
-    edgeProperties.put(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Keep));
-    edgeProperties.put(CoderProperty.of(CODER));
     final RuntimeEdge dummyEdge;
 
     final IRVertex srcMockVertex = mock(IRVertex.class);
@@ -398,20 +395,20 @@ private void writeAndReadWithDuplicateData(final BlockManagerWorker sender,
 
     // Edge setup
     final IREdge dummyIREdge = new IREdge(commPattern, srcVertex, dstVertex);
-    dummyIREdge.setProperty(CoderProperty.of(CODER));
+    dummyIREdge.setProperty(EncoderProperty.of(ENCODER_FACTORY));
+    dummyIREdge.setProperty(DecoderProperty.of(DECODER_FACTORY));
     dummyIREdge.setProperty(KeyExtractorProperty.of((element -> element)));
-    final ExecutionPropertyMap edgeProperties = dummyIREdge.getExecutionProperties();
-    edgeProperties.put(DataCommunicationPatternProperty.of(commPattern));
-    edgeProperties.put(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner));
-    edgeProperties.put(DuplicateEdgeGroupProperty.of(new DuplicateEdgeGroupPropertyValue("dummy")));
+    dummyIREdge.setProperty(DataCommunicationPatternProperty.of(commPattern));
+    dummyIREdge.setProperty(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner));
+    dummyIREdge.setProperty(DuplicateEdgeGroupProperty.of(new DuplicateEdgeGroupPropertyValue("dummy")));
     final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty
-        = edgeProperties.get(DuplicateEdgeGroupProperty.class);
+        = dummyIREdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
     duplicateDataProperty.get().setRepresentativeEdgeId(edgeId);
     duplicateDataProperty.get().setGroupSize(2);
-
-    edgeProperties.put(DataStoreProperty.of(store));
-    edgeProperties.put(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Keep));
+    dummyIREdge.setProperty(DataStoreProperty.of(store));
+    dummyIREdge.setProperty(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Keep));
     final RuntimeEdge dummyEdge, dummyEdge2;
+    final ExecutionPropertyMap edgeProperties = dummyIREdge.getExecutionProperties();
 
     final IRVertex srcMockVertex = mock(IRVertex.class);
     final IRVertex dstMockVertex = mock(IRVertex.class);
@@ -523,8 +520,8 @@ private void writeAndReadWithDuplicateData(final BlockManagerWorker sender,
   private Pair<IRVertex, IRVertex> setupVertices(final String edgeId,
                                                  final BlockManagerWorker sender,
                                                  final BlockManagerWorker receiver) {
-    serializerManagers.get(sender).register(edgeId, CODER);
-    serializerManagers.get(receiver).register(edgeId, CODER);
+    serializerManagers.get(sender).register(edgeId, ENCODER_FACTORY, DECODER_FACTORY);
+    serializerManagers.get(receiver).register(edgeId, ENCODER_FACTORY, DECODER_FACTORY);
 
     // Src setup
     final SourceVertex srcVertex = new EmptyComponents.EmptySourceVertex("Source");
@@ -543,10 +540,10 @@ private void writeAndReadWithDuplicateData(final BlockManagerWorker sender,
                                                  final String edgeId2,
                                                  final BlockManagerWorker sender,
                                                  final BlockManagerWorker receiver) {
-    serializerManagers.get(sender).register(edgeId, CODER);
-    serializerManagers.get(receiver).register(edgeId, CODER);
-    serializerManagers.get(sender).register(edgeId2, CODER);
-    serializerManagers.get(receiver).register(edgeId2, CODER);
+    serializerManagers.get(sender).register(edgeId, ENCODER_FACTORY, DECODER_FACTORY);
+    serializerManagers.get(receiver).register(edgeId, ENCODER_FACTORY, DECODER_FACTORY);
+    serializerManagers.get(sender).register(edgeId2, ENCODER_FACTORY, DECODER_FACTORY);
+    serializerManagers.get(receiver).register(edgeId2, ENCODER_FACTORY, DECODER_FACTORY);
 
     // Src setup
     final SourceVertex srcVertex = new EmptyComponents.EmptySourceVertex("Source");
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
index 55d1d3f6..998f3aaa 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -17,7 +17,6 @@
 
 import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.ir.OutputCollector;
-import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.Readable;
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPassTest.java
index b403f7b5..002873c7 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPassTest.java
@@ -16,13 +16,16 @@
 package edu.snu.nemo.tests.compiler.optimizer.pass.compiletime.annotating;
 
 import edu.snu.nemo.client.JobLauncher;
-import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.common.coder.DecoderFactory;
+import edu.snu.nemo.common.coder.EncoderFactory;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.AnnotatingPass;
-import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultEdgeCoderPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultEdgeDecoderPass;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultEdgeEncoderPass;
 import edu.snu.nemo.tests.compiler.CompilerTestUtil;
 import org.junit.Before;
 import org.junit.Test;
@@ -33,7 +36,7 @@
 import static org.junit.Assert.assertEquals;
 
 /**
- * Test {@link edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultEdgeCoderPass}.
+ * Test {@link DefaultEdgeEncoderPass} and {@link DefaultEdgeDecoderPass}.
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(JobLauncher.class)
@@ -47,33 +50,43 @@ public void setUp() throws Exception {
 
   @Test
   public void testAnnotatingPass() {
-    final AnnotatingPass coderPass = new DefaultEdgeCoderPass();
-    assertEquals(CoderProperty.class, coderPass.getExecutionPropertyToModify());
+    final AnnotatingPass encoderPass = new DefaultEdgeEncoderPass();
+    assertEquals(EncoderProperty.class, encoderPass.getExecutionPropertyToModify());
+    final AnnotatingPass decoderPass = new DefaultEdgeDecoderPass();
+    assertEquals(DecoderProperty.class, decoderPass.getExecutionPropertyToModify());
   }
 
   @Test
   public void testNotOverride() {
     // Get the first coder from the compiled DAG
-    final Coder compiledCoder = compiledDAG
-        .getOutgoingEdgesOf(compiledDAG.getTopologicalSort().get(0)).get(0).getPropertyValue(CoderProperty.class).get();
-    final DAG<IRVertex, IREdge> processedDAG = new DefaultEdgeCoderPass().apply(compiledDAG);
+    final IREdge irEdge = compiledDAG.getOutgoingEdgesOf(compiledDAG.getTopologicalSort().get(0)).get(0);
+    final EncoderFactory compiledEncoderFactory = irEdge.getPropertyValue(EncoderProperty.class).get();
+    final DecoderFactory compiledDecoderFactory = irEdge.getPropertyValue(DecoderProperty.class).get();
+    DAG<IRVertex, IREdge> processedDAG = new DefaultEdgeEncoderPass().apply(compiledDAG);
+    processedDAG = new DefaultEdgeDecoderPass().apply(processedDAG);
 
     // Get the first coder from the processed DAG
-    final Coder processedCoder = processedDAG.getOutgoingEdgesOf(processedDAG.getTopologicalSort().get(0))
-        .get(0).getPropertyValue(CoderProperty.class).get();
-    assertEquals(compiledCoder, processedCoder); // It must not be changed.
+    final IREdge processedIREdge = processedDAG.getOutgoingEdgesOf(processedDAG.getTopologicalSort().get(0)).get(0);
+    final EncoderFactory processedEncoderFactory = processedIREdge.getPropertyValue(EncoderProperty.class).get();
+    assertEquals(compiledEncoderFactory, processedEncoderFactory); // It must not be changed.
+    final DecoderFactory processedDecoderFactory = processedIREdge.getPropertyValue(DecoderProperty.class).get();
+    assertEquals(compiledDecoderFactory, processedDecoderFactory); // It must not be changed.
   }
 
   @Test
   public void testSetToDefault() throws Exception {
     // Remove the first coder from the compiled DAG (to let our pass to set as default coder).
-    compiledDAG.getOutgoingEdgesOf(compiledDAG.getTopologicalSort().get(0))
-        .get(0).getExecutionProperties().remove(CoderProperty.class);
-    final DAG<IRVertex, IREdge> processedDAG = new DefaultEdgeCoderPass().apply(compiledDAG);
+    final IREdge irEdge = compiledDAG.getOutgoingEdgesOf(compiledDAG.getTopologicalSort().get(0)).get(0);
+    irEdge.getExecutionProperties().remove(EncoderProperty.class);
+    irEdge.getExecutionProperties().remove(DecoderProperty.class);
+    DAG<IRVertex, IREdge> processedDAG = new DefaultEdgeEncoderPass().apply(compiledDAG);
+    processedDAG = new DefaultEdgeDecoderPass().apply(processedDAG);
 
-    // Check whether the pass set the empty coder to our default coder.
-    final Coder processedCoder = processedDAG.getOutgoingEdgesOf(processedDAG.getTopologicalSort().get(0))
-        .get(0).getPropertyValue(CoderProperty.class).get();
-    assertEquals(Coder.DUMMY_CODER, processedCoder);
+    // Check whether the pass set the empty coder to our default encoder & decoder.
+    final IREdge processedIREdge = processedDAG.getOutgoingEdgesOf(processedDAG.getTopologicalSort().get(0)).get(0);
+    final EncoderFactory processedEncoderFactory = processedIREdge.getPropertyValue(EncoderProperty.class).get();
+    final DecoderFactory processedDecoderFactory = processedIREdge.getPropertyValue(DecoderProperty.class).get();
+    assertEquals(EncoderFactory.DUMMY_ENCODER_FACTORY, processedEncoderFactory);
+    assertEquals(DecoderFactory.DUMMY_DECODER_FACTORY, processedDecoderFactory);
   }
 }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java
index 8a6b1a85..d2d6954d 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java
@@ -20,7 +20,8 @@
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.LoopVertex;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping.LoopExtractionPass;
@@ -114,13 +115,15 @@ private static void addLoopVertexToBuilder(final DAGBuilder<IRVertex, IREdge> bu
     loopVertexToFollow.getIterativeIncomingEdges().values().forEach(irEdges -> irEdges.forEach(irEdge -> {
       final IREdge newIREdge = new IREdge(irEdge.getPropertyValue(DataCommunicationPatternProperty.class).get(),
           vertexToBeFollowed, loopVertexToFollow);
-      newIREdge.setProperty(CoderProperty.of(irEdge.getPropertyValue(CoderProperty.class).get()));
+      newIREdge.setProperty(EncoderProperty.of(irEdge.getPropertyValue(EncoderProperty.class).get()));
+      newIREdge.setProperty(DecoderProperty.of(irEdge.getPropertyValue(DecoderProperty.class).get()));
       builder.connectVertices(newIREdge);
     }));
     loopVertexToFollow.getNonIterativeIncomingEdges().values().forEach(irEdges -> irEdges.forEach(irEdge -> {
       final IREdge newIREdge = new IREdge(irEdge.getPropertyValue(DataCommunicationPatternProperty.class).get(),
           irEdge.getSrc(), loopVertexToFollow);
-      newIREdge.setProperty(CoderProperty.of(irEdge.getPropertyValue(CoderProperty.class).get()));
+      newIREdge.setProperty(EncoderProperty.of(irEdge.getPropertyValue(EncoderProperty.class).get()));
+      newIREdge.setProperty(DecoderProperty.of(irEdge.getPropertyValue(DecoderProperty.class).get()));
       builder.connectVertices(newIREdge);
     }));
   }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
index de6ff56f..a461873d 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
@@ -20,7 +20,8 @@
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.LoopVertex;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping.LoopExtractionPass;
@@ -92,7 +93,8 @@ public void setUp() throws Exception {
             assertTrue(incomingEdge.isPresent());
             final IREdge newIREdge = new IREdge(incomingEdge.get().getPropertyValue(
                 DataCommunicationPatternProperty.class).get(), incomingEdge.get().getSrc(), alsLoop);
-            newIREdge.setProperty(CoderProperty.of(incomingEdge.get().getPropertyValue(CoderProperty.class).get()));
+            newIREdge.setProperty(EncoderProperty.of(incomingEdge.get().getPropertyValue(EncoderProperty.class).get()));
+            newIREdge.setProperty(DecoderProperty.of(incomingEdge.get().getPropertyValue(DecoderProperty.class).get()));
             builder.connectVertices(newIREdge);
           }
         });
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
index 793c1827..8d13374b 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
@@ -29,21 +29,21 @@
   @Test
   public void testDisaggregationPolicy() {
     final Policy disaggregationPolicy = new DisaggregationPolicy();
-    assertEquals(13, disaggregationPolicy.getCompileTimePasses().size());
+    assertEquals(15, disaggregationPolicy.getCompileTimePasses().size());
     assertEquals(0, disaggregationPolicy.getRuntimePasses().size());
   }
 
   @Test
   public void testPadoPolicy() {
     final Policy padoPolicy = new PadoPolicy();
-    assertEquals(15, padoPolicy.getCompileTimePasses().size());
+    assertEquals(17, padoPolicy.getCompileTimePasses().size());
     assertEquals(0, padoPolicy.getRuntimePasses().size());
   }
 
   @Test
   public void testDataSkewPolicy() {
     final Policy dataSkewPolicy = new DataSkewPolicy();
-    assertEquals(17, dataSkewPolicy.getCompileTimePasses().size());
+    assertEquals(19, dataSkewPolicy.getCompileTimePasses().size());
     assertEquals(1, dataSkewPolicy.getRuntimePasses().size());
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services