You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/07/03 07:46:36 UTC

[incubator-nemo] branch master updated: [NEMO-125] Fix data loss bug caused by SailfishSchedulingPolicy (#68)

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new c3bd780  [NEMO-125] Fix data loss bug caused by SailfishSchedulingPolicy (#68)
c3bd780 is described below

commit c3bd780af7062b380cde4a291e8f978f3da90ad9
Author: Sanha Lee <sa...@gmail.com>
AuthorDate: Tue Jul 3 16:46:34 2018 +0900

    [NEMO-125] Fix data loss bug caused by SailfishSchedulingPolicy (#68)
    
    JIRA: [NEMO-125: Fix data loss bug caused by SailfishSchedulingPolicy](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-125)
    
    **Major changes:**
    - Let `InputStreamIterator` in `DataUtil` use the size of each partition instead of the number of elements to limit the data to be read.
      - This "limiting" behavior is needed to avoid to read padding bytes introduced by compression or other `Stream` implementations (such as `ByteArrayOutputStream`).
    - Remove the number of elements in a partition from partition metadata.
    - Re-enable byte array coder optimization in `SailfishPass`.
    
    **Minor changes to note:**
    - Make `DataTransferFactory` be constructed through Tang only (instead of public constructor).
    - Make `OutputWriter` be constructed through `DataTransferFactory` only.
    
    **Tests for the changes:**
    - Existing data plane unit tests cover this change.
    - Add `testSailfishInOneExecutor` in `WordCountITCase`.
      - This test reproduces the same resource environment and fails without the change in this PR.
    
    **Other comments:**
    - N/A.
    
    resolves [NEMO-125](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-125)
---
 .../pass/compiletime/composite/SailfishPass.java   |  5 +--
 .../snu/nemo/examples/beam/WordCountITCase.java    | 14 ++++++-
 .../beam_sample_one_executor_resources.json        |  7 ++++
 .../snu/nemo/runtime/executor/data/DataUtil.java   | 49 ++++++++++------------
 .../runtime/executor/data/block/FileBlock.java     |  7 ++--
 .../executor/data/metadata/FileMetadata.java       |  6 +--
 .../executor/data/metadata/PartitionMetadata.java  | 15 +------
 .../executor/data/metadata/RemoteFileMetadata.java |  2 -
 .../data/partition/SerializedPartition.java        | 18 --------
 .../executor/datatransfer/DataTransferFactory.java |  6 +--
 .../executor/datatransfer/OutputWriter.java        | 12 +++---
 .../executor/datatransfer/DataTransferTest.java    | 47 ++++++++++++---------
 .../compiletime/composite/SailfishPassTest.java    | 10 ++---
 13 files changed, 90 insertions(+), 108 deletions(-)

diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
index ab1ece1..d5c50e4 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
@@ -32,9 +32,8 @@ public final class SailfishPass extends CompositePass {
         new SailfishRelayReshapingPass(),
         new SailfishEdgeDataFlowModelPass(),
         new SailfishEdgeDataStorePass(),
-        // TODO #125: Fix data loss bug caused by SailfishSchedulingPolicy
-        // new SailfishEdgeDecoderPass(),
-        // new SailfishEdgeEncoderPass(),
+        new SailfishEdgeDecoderPass(),
+        new SailfishEdgeEncoderPass(),
         new SailfishEdgeUsedDataHandlingPass()
     ));
   }
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java
index 507d365..0ffba2e 100644
--- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java
@@ -40,13 +40,13 @@ public final class WordCountITCase {
   private static final String outputFileName = "sample_output_wordcount";
   private static final String testResourceFileName = "test_output_wordcount";
   private static final String executorResourceFileName = fileBasePath + "beam_sample_executor_resources.json";
+  private static final String oneExecutorResourceFileName = fileBasePath + "beam_sample_one_executor_resources.json";
   private static final String inputFilePath =  fileBasePath + inputFileName;
   private static final String outputFilePath =  fileBasePath + outputFileName;
 
   @Before
   public void setUp() throws Exception {
     builder = new ArgBuilder()
-        .addResourceJson(executorResourceFileName)
         .addUserMain(WordCount.class.getCanonicalName())
         .addUserArgs(inputFilePath, outputFilePath);
   }
@@ -63,6 +63,7 @@ public final class WordCountITCase {
   @Test (timeout = TIMEOUT)
   public void test() throws Exception {
     JobLauncher.main(builder
+        .addResourceJson(executorResourceFileName)
         .addJobId(WordCountITCase.class.getSimpleName())
         .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
         .build());
@@ -71,14 +72,25 @@ public final class WordCountITCase {
   @Test (timeout = TIMEOUT)
   public void testSailfish() throws Exception {
     JobLauncher.main(builder
+        .addResourceJson(executorResourceFileName)
         .addJobId(WordCountITCase.class.getSimpleName() + "_sailfish")
         .addOptimizationPolicy(SailfishPolicyParallelismFive.class.getCanonicalName())
         .build());
   }
 
   @Test (timeout = TIMEOUT)
+  public void testSailfishInOneExecutor() throws Exception {
+    JobLauncher.main(builder
+        .addResourceJson(oneExecutorResourceFileName)
+        .addJobId(WordCountITCase.class.getSimpleName() + "_sailfishInOneExecutor")
+        .addOptimizationPolicy(SailfishPolicyParallelismFive.class.getCanonicalName())
+        .build());
+  }
+
+  @Test (timeout = TIMEOUT)
   public void testPado() throws Exception {
     JobLauncher.main(builder
+        .addResourceJson(executorResourceFileName)
         .addJobId(WordCountITCase.class.getSimpleName() + "_pado")
         .addOptimizationPolicy(PadoPolicyParallelismFive.class.getCanonicalName())
         .build());
diff --git a/examples/resources/beam_sample_one_executor_resources.json b/examples/resources/beam_sample_one_executor_resources.json
new file mode 100644
index 0000000..069ed97
--- /dev/null
+++ b/examples/resources/beam_sample_one_executor_resources.json
@@ -0,0 +1,7 @@
+[
+  {
+    "type": "Transient",
+    "memory_mb": 512,
+    "capacity": 5
+  }
+]
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
index 69eb9ba..50c43d9 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
@@ -49,44 +49,39 @@ public final class DataUtil {
   /**
    * Serializes the elements in a non-serialized partition into an output stream.
    *
-   * @param encoderFactory                the encoderFactory to encode the elements.
+   * @param encoderFactory         the encoderFactory to encode the elements.
    * @param nonSerializedPartition the non-serialized partition to serialize.
    * @param bytesOutputStream      the output stream to write.
-   * @return total number of elements in the partition.
    * @throws IOException if fail to serialize.
    */
-  public static long serializePartition(final EncoderFactory encoderFactory,
-                                        final NonSerializedPartition nonSerializedPartition,
-                                        final OutputStream bytesOutputStream) throws IOException {
-    long elementsCount = 0;
+  private static void serializePartition(final EncoderFactory encoderFactory,
+                                         final NonSerializedPartition nonSerializedPartition,
+                                         final OutputStream bytesOutputStream) throws IOException {
     final EncoderFactory.Encoder encoder = encoderFactory.create(bytesOutputStream);
     for (final Object element : nonSerializedPartition.getData()) {
       encoder.encode(element);
-      elementsCount++;
     }
-
-    return elementsCount;
   }
 
   /**
    * Reads the data of a partition from an input stream and deserializes it.
    *
-   * @param elementsInPartition the number of elements in this partition.
-   * @param serializer          the serializer to decode the bytes.
-   * @param key                 the key value of the result partition.
-   * @param inputStream         the input stream which will return the data in the partition as bytes.
-   * @param <K>                 the key type of the partitions.
+   * @param partitionSize the size of the partition to deserialize.
+   * @param serializer    the serializer to decode the bytes.
+   * @param key           the key value of the result partition.
+   * @param inputStream   the input stream which will return the data in the partition as bytes.
+   * @param <K>           the key type of the partitions.
    * @return the list of deserialized elements.
    * @throws IOException if fail to deserialize.
    */
-  public static <K extends Serializable> NonSerializedPartition deserializePartition(final long elementsInPartition,
+  public static <K extends Serializable> NonSerializedPartition deserializePartition(final int partitionSize,
                                                                                      final Serializer serializer,
                                                                                      final K key,
                                                                                      final InputStream inputStream)
       throws IOException {
     final List deserializedData = new ArrayList();
     final InputStreamIterator iterator = new InputStreamIterator(Collections.singletonList(inputStream).iterator(),
-        serializer, elementsInPartition);
+        serializer, partitionSize);
     iterator.forEachRemaining(deserializedData::add);
     return new NonSerializedPartition(key, deserializedData, iterator.getNumSerializedBytes(),
         iterator.getNumEncodedBytes());
@@ -111,15 +106,14 @@ public final class DataUtil {
           final DirectByteArrayOutputStream bytesOutputStream = new DirectByteArrayOutputStream();
           final OutputStream wrappedStream = buildOutputStream(bytesOutputStream, serializer.getEncodeStreamChainers());
       ) {
-        final long elementsTotal =
-            serializePartition(serializer.getEncoderFactory(), partitionToConvert, wrappedStream);
+        serializePartition(serializer.getEncoderFactory(), partitionToConvert, wrappedStream);
         // We need to close wrappedStream on here, because DirectByteArrayOutputStream:getBufDirectly() returns
         // inner buffer directly, which can be an unfinished(not flushed) buffer.
         wrappedStream.close();
         final byte[] serializedBytes = bytesOutputStream.getBufDirectly();
         final int actualLength = bytesOutputStream.getCount();
         serializedPartitions.add(
-            new SerializedPartition<>(partitionToConvert.getKey(), elementsTotal, serializedBytes, actualLength));
+            new SerializedPartition<>(partitionToConvert.getKey(), serializedBytes, actualLength));
       }
     }
     return serializedPartitions;
@@ -144,7 +138,7 @@ public final class DataUtil {
       try (final ByteArrayInputStream byteArrayInputStream =
                new ByteArrayInputStream(partitionToConvert.getData())) {
         final NonSerializedPartition<K> deserializePartition = deserializePartition(
-            partitionToConvert.getElementsCount(), serializer, key, byteArrayInputStream);
+            partitionToConvert.getLength(), serializer, key, byteArrayInputStream);
         nonSerializedPartitions.add(deserializePartition);
       }
     }
@@ -211,7 +205,6 @@ public final class DataUtil {
     private volatile T next;
     private volatile boolean cannotContinueDecoding = false;
     private volatile DecoderFactory.Decoder<T> decoder = null;
-    private volatile long elementsDecoded = 0;
     private volatile long numSerializedBytes = 0;
     private volatile long numEncodedBytes = 0;
 
@@ -221,8 +214,8 @@ public final class DataUtil {
      * @param inputStreams The streams to read data from.
      * @param serializer   The serializer.
      */
-    public InputStreamIterator(final Iterator<InputStream> inputStreams,
-                               final Serializer<?, T> serializer) {
+    InputStreamIterator(final Iterator<InputStream> inputStreams,
+                        final Serializer<?, T> serializer) {
       this.inputStreams = inputStreams;
       this.serializer = serializer;
       // -1 means no limit.
@@ -234,12 +227,12 @@ public final class DataUtil {
      *
      * @param inputStreams The streams to read data from.
      * @param serializer   The serializer.
-     * @param limit        The number of elements from the {@link InputStream}.
+     * @param limit        The bytes to read from the {@link InputStream}.
      */
-    public InputStreamIterator(
+    private InputStreamIterator(
         final Iterator<InputStream> inputStreams,
         final Serializer<?, T> serializer,
-        final long limit) {
+        final int limit) {
       if (limit < 0) {
         throw new IllegalArgumentException("Negative limit not allowed.");
       }
@@ -256,7 +249,8 @@ public final class DataUtil {
       if (cannotContinueDecoding) {
         return false;
       }
-      if (limit != -1 && limit == elementsDecoded) {
+      if (limit != -1 && limit == (serializedCountingStream == null
+          ? numSerializedBytes : numSerializedBytes + serializedCountingStream.getCount())) {
         cannotContinueDecoding = true;
         return false;
       }
@@ -280,7 +274,6 @@ public final class DataUtil {
         try {
           next = decoder.decode();
           hasNext = true;
-          elementsDecoded++;
           return true;
         } catch (final IOException e) {
           // IOException from decoder indicates EOF event.
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
index 9a64d86..41f125b 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
@@ -78,8 +78,7 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
     try (final FileOutputStream fileOutputStream = new FileOutputStream(filePath, true)) {
       for (final SerializedPartition<K> serializedPartition : serializedPartitions) {
         // Reserve a partition write and get the metadata.
-        metadata.writePartitionMetadata(
-            serializedPartition.getKey(), serializedPartition.getLength(), serializedPartition.getElementsCount());
+        metadata.writePartitionMetadata(serializedPartition.getKey(), serializedPartition.getLength());
         fileOutputStream.write(serializedPartition.getData(), 0, serializedPartition.getLength());
       }
     }
@@ -187,7 +186,7 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
                   new LimitedInputStream(fileStream, partitionMetadata.getPartitionSize());
               final NonSerializedPartition<K> deserializePartition =
                   DataUtil.deserializePartition(
-                      partitionMetadata.getElementsTotal(), serializer, key, limitedInputStream);
+                      partitionMetadata.getPartitionSize(), serializer, key, limitedInputStream);
               deserializedPartitions.add(deserializePartition);
               // rearrange file pointer
               final long toSkip = partitionMetadata.getPartitionSize() - availableBefore + fileStream.available();
@@ -237,7 +236,7 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
                 throw new IOException("The read data size does not match with the partition size.");
               }
               partitionsInRange.add(new SerializedPartition<>(
-                  key, partitionmetadata.getElementsTotal(), serializedData, serializedData.length));
+                  key, serializedData, serializedData.length));
             } else {
               // Have to skip this partition.
               skipBytes(fileStream, partitionmetadata.getPartitionSize());
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/FileMetadata.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/FileMetadata.java
index 3eafe8e..108a4e7 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/FileMetadata.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/FileMetadata.java
@@ -58,18 +58,16 @@ public abstract class FileMetadata<K extends Serializable> {
    *
    * @param key     the key of the partition.
    * @param partitionSize the size of the partition.
-   * @param elementsTotal the number of elements in the partition.
    * @throws IOException if fail to append the partition metadata.
    */
   public final synchronized void writePartitionMetadata(final K key,
-                                                        final int partitionSize,
-                                                        final long elementsTotal) throws IOException {
+                                                        final int partitionSize) throws IOException {
     if (committed.get()) {
       throw new IOException("Cannot write a new block to a closed partition.");
     }
 
     final PartitionMetadata partitionMetadata =
-        new PartitionMetadata(key, partitionSize, writtenBytesCursor, elementsTotal);
+        new PartitionMetadata(key, partitionSize, writtenBytesCursor);
     partitionMetadataList.add(partitionMetadata);
     writtenBytesCursor += partitionSize;
   }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/PartitionMetadata.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/PartitionMetadata.java
index 9d68e5a..2fa9de2 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/PartitionMetadata.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/PartitionMetadata.java
@@ -25,7 +25,6 @@ public final class PartitionMetadata<K extends Serializable> {
   private final K key;
   private final int partitionSize;
   private final long offset;
-  private final long elementsTotal;
 
   /**
    * Constructor.
@@ -33,16 +32,13 @@ public final class PartitionMetadata<K extends Serializable> {
    * @param key           the key of this partition.
    * @param partitionSize the size of this partition.
    * @param offset        the offset of this partition.
-   * @param elementsTotal the total number of elements in this partition.
    */
   public PartitionMetadata(final K key,
                            final int partitionSize,
-                           final long offset,
-                           final long elementsTotal) {
+                           final long offset) {
     this.key = key;
     this.partitionSize = partitionSize;
     this.offset = offset;
-    this.elementsTotal = elementsTotal;
   }
 
   /**
@@ -66,13 +62,6 @@ public final class PartitionMetadata<K extends Serializable> {
     return offset;
   }
 
-  /**
-   * @return the total number of elements in this partition.
-   */
-  public long getElementsTotal() {
-    return elementsTotal;
-  }
-
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder();
@@ -82,8 +71,6 @@ public final class PartitionMetadata<K extends Serializable> {
     sb.append(partitionSize);
     sb.append("/ offset: ");
     sb.append(offset);
-    sb.append("/ elementsTotal: ");
-    sb.append(elementsTotal);
     return sb.toString();
   }
 }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/RemoteFileMetadata.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/RemoteFileMetadata.java
index 9a22e43..8ad2126 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/RemoteFileMetadata.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/RemoteFileMetadata.java
@@ -83,7 +83,6 @@ public final class RemoteFileMetadata<K extends Serializable> extends FileMetada
         dataOutputStream.write(key);
         dataOutputStream.writeInt(partitionMetadata.getPartitionSize());
         dataOutputStream.writeLong(partitionMetadata.getOffset());
-        dataOutputStream.writeLong(partitionMetadata.getElementsTotal());
       }
     }
     setCommitted(true);
@@ -127,7 +126,6 @@ public final class RemoteFileMetadata<K extends Serializable> extends FileMetada
         final PartitionMetadata<T> partitionMetadata = new PartitionMetadata<>(
             SerializationUtils.deserialize(desKey),
             dataInputStream.readInt(),
-            dataInputStream.readLong(),
             dataInputStream.readLong()
         );
         partitionMetadataList.add(partitionMetadata);
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java
index d6d63d1..4015000 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java
@@ -32,7 +32,6 @@ import static edu.snu.nemo.runtime.executor.data.DataUtil.buildOutputStream;
  */
 public final class SerializedPartition<K> implements Partition<byte[], K> {
   private final K key;
-  private volatile long elementsCount;
   private volatile byte[] serializedData;
   private volatile int length;
   private volatile boolean committed;
@@ -52,7 +51,6 @@ public final class SerializedPartition<K> implements Partition<byte[], K> {
   public SerializedPartition(final K key,
                              final Serializer serializer) throws IOException {
     this.key = key;
-    this.elementsCount = 0;
     this.serializedData = new byte[0];
     this.length = 0;
     this.committed = false;
@@ -66,16 +64,13 @@ public final class SerializedPartition<K> implements Partition<byte[], K> {
    * Data cannot be written to this partition after the construction.
    *
    * @param key            the key.
-   * @param elementsTotal  the total number of elements.
    * @param serializedData the serialized data.
    * @param length         the length of the actual serialized data. (It can be different with serializedData.length)
    */
   public SerializedPartition(final K key,
-                             final long elementsTotal,
                              final byte[] serializedData,
                              final int length) {
     this.key = key;
-    this.elementsCount = elementsTotal;
     this.serializedData = serializedData;
     this.length = length;
     this.committed = true;
@@ -97,7 +92,6 @@ public final class SerializedPartition<K> implements Partition<byte[], K> {
     } else {
       try {
         encoder.encode(element);
-        elementsCount++;
       } catch (final IOException e) {
         wrappedStream.close();
       }
@@ -160,16 +154,4 @@ public final class SerializedPartition<K> implements Partition<byte[], K> {
       return length;
     }
   }
-
-  /**
-   * @return the number of elements.
-   * @throws IOException if the partition is not committed yet.
-   */
-  public long getElementsCount() throws IOException {
-    if (!committed) {
-      throw new IOException("The partition is not committed yet!");
-    } else {
-      return elementsCount;
-    }
-  }
 }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java
index 7a4068e..36db3ec 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java
@@ -32,8 +32,8 @@ public final class DataTransferFactory {
   private final int hashRangeMultiplier;
 
   @Inject
-  public DataTransferFactory(@Parameter(JobConf.HashRangeMultiplier.class) final int hashRangeMultiplier,
-                             final BlockManagerWorker blockManagerWorker) {
+  private DataTransferFactory(@Parameter(JobConf.HashRangeMultiplier.class) final int hashRangeMultiplier,
+                              final BlockManagerWorker blockManagerWorker) {
     this.hashRangeMultiplier = hashRangeMultiplier;
     this.blockManagerWorker = blockManagerWorker;
   }
@@ -41,7 +41,7 @@ public final class DataTransferFactory {
   /**
    * Creates an {@link OutputWriter} between two stages.
    *
-   * @param srcIRVertex     the {@link IRVertex} that outputs the data to be written.
+   * @param srcIRVertex the {@link IRVertex} that outputs the data to be written.
    * @param srcTaskIdx  the index of the source task.
    * @param dstIRVertex the {@link IRVertex} that will take the output data as its input.
    * @param runtimeEdge that connects the srcTask to the tasks belonging to dstIRVertex.
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
index bb594f6..1744bb2 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -53,12 +53,12 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable {
    * @param runtimeEdge         the {@link RuntimeEdge}.
    * @param blockManagerWorker  the {@link BlockManagerWorker}.
    */
-  public OutputWriter(final int hashRangeMultiplier,
-                      final int srcTaskIdx,
-                      final String srcRuntimeVertexId,
-                      final IRVertex dstIrVertex,
-                      final RuntimeEdge<?> runtimeEdge,
-                      final BlockManagerWorker blockManagerWorker) {
+  OutputWriter(final int hashRangeMultiplier,
+               final int srcTaskIdx,
+               final String srcRuntimeVertexId,
+               final IRVertex dstIrVertex,
+               final RuntimeEdge<?> runtimeEdge,
+               final BlockManagerWorker blockManagerWorker) {
     super(runtimeEdge.getId());
     this.blockId = RuntimeIdGenerator.generateBlockId(getId(), srcTaskIdx);
     this.runtimeEdge = runtimeEdge;
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index df9454a..f801685 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -91,16 +91,21 @@ import static org.mockito.Mockito.mock;
     SourceVertex.class, ClientRPC.class, MetricManagerMaster.class})
 public final class DataTransferTest {
   private static final String EXECUTOR_ID_PREFIX = "Executor";
-  private static final InterTaskDataStoreProperty.Value MEMORY_STORE = InterTaskDataStoreProperty.Value.MemoryStore;
-  private static final InterTaskDataStoreProperty.Value SER_MEMORY_STORE = InterTaskDataStoreProperty.Value.SerializedMemoryStore;
-  private static final InterTaskDataStoreProperty.Value LOCAL_FILE_STORE = InterTaskDataStoreProperty.Value.LocalFileStore;
-  private static final InterTaskDataStoreProperty.Value REMOTE_FILE_STORE = InterTaskDataStoreProperty.Value.GlusterFileStore;
+  private static final InterTaskDataStoreProperty.Value MEMORY_STORE =
+      InterTaskDataStoreProperty.Value.MemoryStore;
+  private static final InterTaskDataStoreProperty.Value SER_MEMORY_STORE =
+      InterTaskDataStoreProperty.Value.SerializedMemoryStore;
+  private static final InterTaskDataStoreProperty.Value LOCAL_FILE_STORE =
+      InterTaskDataStoreProperty.Value.LocalFileStore;
+  private static final InterTaskDataStoreProperty.Value REMOTE_FILE_STORE =
+      InterTaskDataStoreProperty.Value.GlusterFileStore;
   private static final String TMP_LOCAL_FILE_DIRECTORY = "./tmpLocalFiles";
   private static final String TMP_REMOTE_FILE_DIRECTORY = "./tmpRemoteFiles";
   private static final int PARALLELISM_TEN = 10;
   private static final String EDGE_PREFIX_TEMPLATE = "Dummy(%d)";
   private static final AtomicInteger TEST_INDEX = new AtomicInteger(0);
-  private static final EncoderFactory ENCODER_FACTORY = PairEncoderFactory.of(IntEncoderFactory.of(), IntEncoderFactory.of());
+  private static final EncoderFactory ENCODER_FACTORY =
+      PairEncoderFactory.of(IntEncoderFactory.of(), IntEncoderFactory.of());
   private static final DecoderFactory DECODER_FACTORY =
       PairDecoderFactory.of(IntDecoderFactory.of(), IntDecoderFactory.of());
   private static final Tang TANG = Tang.Factory.getTang();
@@ -108,6 +113,7 @@ public final class DataTransferTest {
 
   private BlockManagerMaster master;
   private BlockManagerWorker worker1;
+  private DataTransferFactory transferFactory;
   private BlockManagerWorker worker2;
   private HashMap<BlockManagerWorker, SerializerManager> serializerManagers = new HashMap<>();
 
@@ -152,10 +158,12 @@ public final class DataTransferTest {
     injector2.bindVolatileParameter(JobConf.JobId.class, "data transfer test");
 
     this.master = master;
-    this.worker1 = createWorker(EXECUTOR_ID_PREFIX + executorCount.getAndIncrement(), messageDispatcher,
-        injector2);
+    final Pair<BlockManagerWorker, DataTransferFactory> pair1 =
+        createWorker(EXECUTOR_ID_PREFIX + executorCount.getAndIncrement(), messageDispatcher, injector2);
+    this.worker1 = pair1.left();
+    this.transferFactory = pair1.right();
     this.worker2 = createWorker(EXECUTOR_ID_PREFIX + executorCount.getAndIncrement(), messageDispatcher,
-        injector2);
+        injector2).left();
   }
 
   @After
@@ -164,8 +172,10 @@ public final class DataTransferTest {
     FileUtils.deleteDirectory(new File(TMP_REMOTE_FILE_DIRECTORY));
   }
 
-  private BlockManagerWorker createWorker(final String executorId, final LocalMessageDispatcher messageDispatcher,
-                                          final Injector nameClientInjector) {
+  private Pair<BlockManagerWorker, DataTransferFactory> createWorker(
+      final String executorId,
+      final LocalMessageDispatcher messageDispatcher,
+      final Injector nameClientInjector) {
     final LocalMessageEnvironment messageEnvironment = new LocalMessageEnvironment(executorId, messageDispatcher);
     final PersistentConnectionToMasterMap conToMaster = new PersistentConnectionToMasterMap(messageEnvironment);
     final Configuration executorConfiguration = TANG.newConfigurationBuilder()
@@ -180,11 +190,13 @@ public final class DataTransferTest {
     final BlockManagerWorker blockManagerWorker;
     final MetricManagerWorker metricManagerWorker;
     final SerializerManager serializerManager;
+    final DataTransferFactory dataTransferFactory;
     try {
       blockManagerWorker = injector.getInstance(BlockManagerWorker.class);
-      metricManagerWorker =  injector.getInstance(MetricManagerWorker.class);
+      metricManagerWorker = injector.getInstance(MetricManagerWorker.class);
       serializerManager = injector.getInstance(SerializerManager.class);
       serializerManagers.put(blockManagerWorker, serializerManager);
+      dataTransferFactory = injector.getInstance(DataTransferFactory.class);
     } catch (final InjectionException e) {
       throw new RuntimeException(e);
     }
@@ -195,11 +207,11 @@ public final class DataTransferTest {
         conToMaster,
         messageEnvironment,
         serializerManager,
-        new DataTransferFactory(HASH_RANGE_MULTIPLIER, blockManagerWorker),
+        dataTransferFactory,
         metricManagerWorker);
     injector.bindVolatileInstance(Executor.class, executor);
 
-    return blockManagerWorker;
+    return Pair.of(blockManagerWorker, dataTransferFactory);
   }
 
   private Injector createNameClientInjector() {
@@ -336,8 +348,7 @@ public final class DataTransferTest {
     final List<List> dataWrittenList = new ArrayList<>();
     IntStream.range(0, PARALLELISM_TEN).forEach(srcTaskIndex -> {
       final List dataWritten = getRangedNumList(0, PARALLELISM_TEN);
-      final OutputWriter writer = new OutputWriter(HASH_RANGE_MULTIPLIER, srcTaskIndex, srcVertex.getId(), dstVertex,
-          dummyEdge, sender);
+      final OutputWriter writer = transferFactory.createWriter(srcVertex, srcTaskIndex, dstVertex, dummyEdge);
       dataWritten.iterator().forEachRemaining(writer::write);
       writer.close();
       dataWrittenList.add(dataWritten);
@@ -432,14 +443,12 @@ public final class DataTransferTest {
     final List<List> dataWrittenList = new ArrayList<>();
     IntStream.range(0, PARALLELISM_TEN).forEach(srcTaskIndex -> {
       final List dataWritten = getRangedNumList(0, PARALLELISM_TEN);
-      final OutputWriter writer = new OutputWriter(HASH_RANGE_MULTIPLIER, srcTaskIndex, srcVertex.getId(), dstVertex,
-          dummyEdge, sender);
+      final OutputWriter writer = transferFactory.createWriter(srcVertex, srcTaskIndex, dstVertex, dummyEdge);
       dataWritten.iterator().forEachRemaining(writer::write);
       writer.close();
       dataWrittenList.add(dataWritten);
 
-      final OutputWriter writer2 = new OutputWriter(HASH_RANGE_MULTIPLIER, srcTaskIndex, srcVertex.getId(), dstVertex,
-          dummyEdge2, sender);
+      final OutputWriter writer2 = transferFactory.createWriter(srcVertex, srcTaskIndex, dstVertex, dummyEdge2);
       dataWritten.iterator().forEachRemaining(writer2::write);
       writer2.close();
     });
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
index 5d24f81..ab1beb7 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
@@ -63,9 +63,8 @@ public class SailfishPassTest {
                 edgeToMerger.getPropertyValue(UsedDataHandlingProperty.class).get());
             assertEquals(InterTaskDataStoreProperty.Value.SerializedMemoryStore,
                 edgeToMerger.getPropertyValue(InterTaskDataStoreProperty.class).get());
-            // TODO #125: Fix data loss bug caused by SailfishSchedulingPolicy
-            //assertEquals(BytesDecoderFactory.of(),
-            //    edgeToMerger.getPropertyValue(DecoderProperty.class).get());
+            assertEquals(BytesDecoderFactory.of(),
+                edgeToMerger.getPropertyValue(DecoderProperty.class).get());
           } else {
             assertEquals(DataFlowModelProperty.Value.Pull,
                 edgeToMerger.getPropertyValue(DataFlowModelProperty.class).get());
@@ -78,9 +77,8 @@ public class SailfishPassTest {
               edgeFromMerger.getPropertyValue(DataCommunicationPatternProperty.class).get());
           assertEquals(InterTaskDataStoreProperty.Value.LocalFileStore,
               edgeFromMerger.getPropertyValue(InterTaskDataStoreProperty.class).get());
-          // TODO #125: Fix data loss bug caused by SailfishSchedulingPolicy
-          //assertEquals(BytesEncoderFactory.of(),
-          //    edgeFromMerger.getPropertyValue(EncoderProperty.class).get());
+          assertEquals(BytesEncoderFactory.of(),
+              edgeFromMerger.getPropertyValue(EncoderProperty.class).get());
         });
       } else {
         // Non merger vertex.