You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2019/06/25 06:29:54 UTC

[incubator-nemo] branch master updated: [NEMO-350] Implement Off-heap SerializedMemoryStore & [NEMO-384] Implement DirectByteBufferInputStream for Off-heap SerializedMemoryStore (#222)

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new ea448db  [NEMO-350] Implement Off-heap SerializedMemoryStore & [NEMO-384] Implement DirectByteBufferInputStream for Off-heap SerializedMemoryStore (#222)
ea448db is described below

commit ea448dbca2ba46d27716b6d55f9072be5aaa4eec
Author: Haeyoon Cho <ch...@gmail.com>
AuthorDate: Tue Jun 25 15:29:49 2019 +0900

    [NEMO-350] Implement Off-heap SerializedMemoryStore & [NEMO-384] Implement DirectByteBufferInputStream for Off-heap SerializedMemoryStore (#222)
    
    JIRA: [NEMO-350: Implement Off-heap SerializedMemoryStore](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-#350)
    [NEMO-384: Implement DirectByteBufferInputStream for Off-heap SerializedMemoryStore](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-#384)
    
    **Major changes:**
    - When a block is emitted by an executor, we write it directly to off-heap memory using `DirectByteBufferOutputStream` and `DirectByteBufferOutputStream`.
    
    **Minor changes to note:**
    - `getData()` and `getBuffer` should be distinguished when acquiring data in `SerializedPartition`
    
    **Other comments:**
    - This implementation does not ensure performance gain since the overhead of `allocateDirect` (malloc) surpasses the garbage collection overhead. For this reason, memory management is being implemented.
---
 .../apache/nemo/common/ByteBufferInputStream.java  | 75 ++++++++++++++++++++++
 .../nemo/common/DirectByteArrayOutputStream.java   | 55 ----------------
 .../nemo/common/DirectByteBufferOutputStream.java  | 66 ++++++++++++-------
 .../nemo/common/coder/BytesDecoderFactory.java     |  4 +-
 .../common/DirectByteBufferOutputStreamTest.java   |  4 +-
 .../executor/bytetransfer/ByteOutputContext.java   | 60 +++++++++--------
 .../runtime/executor/data/BlockManagerWorker.java  |  2 +-
 .../nemo/runtime/executor/data/DataUtil.java       | 20 +++---
 .../runtime/executor/data/block/FileBlock.java     | 23 +++++--
 .../data/partition/SerializedPartition.java        | 71 ++++++++++++++++++--
 10 files changed, 249 insertions(+), 131 deletions(-)

diff --git a/common/src/main/java/org/apache/nemo/common/ByteBufferInputStream.java b/common/src/main/java/org/apache/nemo/common/ByteBufferInputStream.java
new file mode 100644
index 0000000..9e07c7f
--- /dev/null
+++ b/common/src/main/java/org/apache/nemo/common/ByteBufferInputStream.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * This class is a customized input stream implementation which reads data from
+ * list of {@link ByteBuffer}. If the {@link ByteBuffer} is direct, it may reside outside
+ * the normal garbage-collected heap memory.
+ */
+public class ByteBufferInputStream extends InputStream {
+  private List<ByteBuffer> bufList;
+  private int current = 0;
+  private static final int BITMASK = 0xff;
+
+  /**
+   * Default Constructor.
+   *
+   * @param bufList is the target data to read.
+   */
+  public ByteBufferInputStream(final List<ByteBuffer> bufList) {
+   this.bufList = bufList;
+  }
+
+  /**
+   * Reads data from the list of {@code ByteBuffer}s.
+   *
+   * @return integer.
+   * @throws IOException
+   */
+  @Override
+  public int read() throws IOException {
+    // Since java's byte is signed type, we have to mask it to make byte
+    // become unsigned type to properly retrieve `int` from sequence of bytes.
+    return getBuffer().get() & BITMASK;
+  }
+
+  /**
+   * Return next non-empty @code{ByteBuffer}.
+   *
+   * @return @code{ByteBuffer} to write the data
+   * @throws IOException when fail to retrieve buffer.
+   */
+  public ByteBuffer getBuffer() throws IOException {
+    while (current < bufList.size()) {
+      ByteBuffer buffer = bufList.get(current);
+      if (buffer.hasRemaining()) {
+        return buffer;
+      }
+      current += 1;
+    }
+    throw new EOFException();
+  }
+}
diff --git a/common/src/main/java/org/apache/nemo/common/DirectByteArrayOutputStream.java b/common/src/main/java/org/apache/nemo/common/DirectByteArrayOutputStream.java
deleted file mode 100644
index 9f41e5d..0000000
--- a/common/src/main/java/org/apache/nemo/common/DirectByteArrayOutputStream.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.common;
-
-import java.io.ByteArrayOutputStream;
-
-/**
- * This class represents a custom implementation of {@link ByteArrayOutputStream},
- * which enables to get bytes buffer directly (without memory copy).
- * TODO #370: Substitute ByteArrayOutputStream with java.nio.ByteBuffer
- */
-public final class DirectByteArrayOutputStream extends ByteArrayOutputStream {
-
-  /**
-   * Default constructor.
-   */
-  public DirectByteArrayOutputStream() {
-    super();
-  }
-
-  /**
-   * Constructor specifying the size.
-   *
-   * @param size the initial size.
-   */
-  public DirectByteArrayOutputStream(final int size) {
-    super(size);
-  }
-
-  /**
-   * Note that serializedBytes include invalid bytes.
-   * So we have to use it with the actualLength by using size() whenever needed.
-   *
-   * @return the buffer where data is stored.
-   */
-  public byte[] getBufDirectly() {
-    return buf;
-  }
-}
diff --git a/common/src/main/java/org/apache/nemo/common/DirectByteBufferOutputStream.java b/common/src/main/java/org/apache/nemo/common/DirectByteBufferOutputStream.java
index 7d96dfa..8a74fb2 100644
--- a/common/src/main/java/org/apache/nemo/common/DirectByteBufferOutputStream.java
+++ b/common/src/main/java/org/apache/nemo/common/DirectByteBufferOutputStream.java
@@ -18,8 +18,11 @@
  */
 package org.apache.nemo.common;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -27,15 +30,17 @@ import java.util.List;
  * This class is a customized output stream implementation backed by
  * {@link ByteBuffer}, which utilizes off heap memory when writing the data.
  * Memory is allocated when needed by the specified {@code pageSize}.
+ * Deletion of {@code dataList}, which is the memory this outputstream holds, occurs
+ * when the corresponding block is deleted.
+ * TODO #388: Off-heap memory management (reuse ByteBuffer) - implement reuse.
  */
 public final class DirectByteBufferOutputStream extends OutputStream {
 
   private LinkedList<ByteBuffer> dataList = new LinkedList<>();
-  private static final int DEFAULT_PAGE_SIZE = 4096;
+  private static final int DEFAULT_PAGE_SIZE = 32768; //32KB
   private final int pageSize;
   private ByteBuffer currentBuf;
 
-
   /**
    * Default constructor.
    * Sets the {@code pageSize} as default size of 4096 bytes.
@@ -45,8 +50,9 @@ public final class DirectByteBufferOutputStream extends OutputStream {
   }
 
   /**
-   * Constructor specifying the {@code size}.
-   * Sets the {@code pageSize} as {@code size}.
+   * Constructor which sets {@code pageSize} as specified {@code size}.
+   * Note that the {@code pageSize} has trade-off between memory fragmentation and
+   * native memory (de)allocation overhead.
    *
    * @param size should be a power of 2 and greater than or equal to 4096.
    */
@@ -62,6 +68,7 @@ public final class DirectByteBufferOutputStream extends OutputStream {
   /**
    * Allocates new {@link ByteBuffer} with the capacity equal to {@code pageSize}.
    */
+  // TODO #388: Off-heap memory management (reuse ByteBuffer)
   private void newLastBuffer() {
     dataList.addLast(ByteBuffer.allocateDirect(pageSize));
   }
@@ -120,14 +127,15 @@ public final class DirectByteBufferOutputStream extends OutputStream {
     }
   }
 
+
   /**
    * Creates a byte array that contains the whole content currently written in this output stream.
-   * Note that this method causes array copy which could degrade performance.
-   * TODO #384: For performance issue, implement an input stream so that we do not have to use this method.
    *
+   * USED BY TESTS ONLY.
    * @return the current contents of this output stream, as byte array.
    */
-  public byte[] toByteArray() {
+  @VisibleForTesting
+  byte[] toByteArray() {
     if (dataList.isEmpty()) {
       final byte[] byteArray = new byte[0];
       return byteArray;
@@ -140,39 +148,49 @@ public final class DirectByteBufferOutputStream extends OutputStream {
     final int arraySize = pageSize * (dataList.size() - 1) + lastBuf.position();
     final byte[] byteArray = new byte[arraySize];
     int start = 0;
-    int byteToWrite;
-
-    for (final ByteBuffer temp : dataList) {
-      // ByteBuffer has to be shifted to read mode by calling ByteBuffer.flip(),
-      // which sets limit to the current position and sets the position to 0.
-      // Note that capacity remains unchanged.
-      temp.flip();
-      byteToWrite = temp.remaining();
-      temp.get(byteArray, start, byteToWrite);
+
+    for (final ByteBuffer buffer : dataList) {
+      // We use duplicated buffer to read the data so that there is no complicated
+      // alteration of position and limit when switching between read and write mode.
+      final ByteBuffer dupBuffer = buffer.duplicate();
+      dupBuffer.flip();
+      final int byteToWrite = dupBuffer.remaining();
+      dupBuffer.get(byteArray, start, byteToWrite);
       start += byteToWrite;
     }
-    // The limit of the last buffer has to be set to the capacity for additional write.
-    lastBuf.limit(lastBuf.capacity());
 
     return byteArray;
   }
 
   /**
    * Returns the list of {@code ByteBuffer}s that contains the written data.
-   * Note that by calling this method, the existing list of {@code ByteBuffer}s is cleared.
+   * List of flipped and duplicated {@link ByteBuffer}s are returned which has independent
+   * position and limit, to reduce erroneous data read/write.
+   * This function has to be called when intended to read from the start of the list of
+   * {@link ByteBuffer}s, not for additional write.
    *
    * @return the {@code LinkedList} of {@code ByteBuffer}s.
    */
-  public List<ByteBuffer> getBufferListAndClear() {
-    List<ByteBuffer> result = dataList;
-    dataList = new LinkedList<>();
-    for (final ByteBuffer buffer : result) {
-      buffer.flip();
+  public List<ByteBuffer> getDirectByteBufferList() {
+    List<ByteBuffer> result = new ArrayList<>(dataList.size());
+    for (final ByteBuffer buffer : dataList) {
+      final ByteBuffer dupBuffer = buffer.duplicate();
+      dupBuffer.flip();
+      result.add(dupBuffer);
     }
     return result;
   }
 
   /**
+   * Returns the size of the data written in this output stream.
+   *
+   * @return the size of the data
+   */
+  public int size() {
+    return pageSize * (dataList.size() - 1) + dataList.getLast().position();
+  }
+
+  /**
    * Closing this output stream has no effect.
    */
   public void close() {
diff --git a/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java b/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java
index 594d412..847ad8f 100644
--- a/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java
+++ b/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java
@@ -18,10 +18,10 @@
  */
 package org.apache.nemo.common.coder;
 
-import org.apache.nemo.common.DirectByteArrayOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -82,7 +82,7 @@ public final class BytesDecoderFactory implements DecoderFactory<byte[]> {
     public byte[] decode() throws IOException {
       // We cannot use inputStream.available() to know the length of bytes to read.
       // The available method only returns the number of bytes can be read without blocking.
-      final DirectByteArrayOutputStream byteOutputStream = new DirectByteArrayOutputStream();
+      final ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
       int b = inputStream.read();
       while (b != -1) {
         byteOutputStream.write(b);
diff --git a/common/src/test/java/org/apache/nemo/common/DirectByteBufferOutputStreamTest.java b/common/src/test/java/org/apache/nemo/common/DirectByteBufferOutputStreamTest.java
index c4dbfb1..564ec5a 100644
--- a/common/src/test/java/org/apache/nemo/common/DirectByteBufferOutputStreamTest.java
+++ b/common/src/test/java/org/apache/nemo/common/DirectByteBufferOutputStreamTest.java
@@ -97,11 +97,11 @@ public class DirectByteBufferOutputStreamTest {
   }
 
   @Test
-  public void testGetBufferList() {
+  public void testGetDirectBufferList() {
     String value = RandomStringUtils.randomAlphanumeric(10000);
     outputStream.write(value.getBytes());
     byte[] totalOutput = outputStream.toByteArray();
-    List<ByteBuffer> bufList = outputStream.getBufferListAndClear();
+    List<ByteBuffer> bufList = outputStream.getDirectByteBufferList();
     int offset = 0;
     int byteToRead;
     for (final ByteBuffer temp : bufList) {
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
index 019358a..01ba02a 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
@@ -34,9 +34,13 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.List;
+
+import static io.netty.buffer.Unpooled.wrappedBuffer;
 
 /**
  * Container for multiple output streams. Represents a transfer context on sender-side.
@@ -131,25 +135,11 @@ public final class ByteOutputContext extends ByteTransferContext implements Auto
    * <p>Public methods are thread safe,
    * although the execution order may not be linearized if they were called from different threads.</p>
    */
-  public final class ByteOutputStream extends OutputStream {
+  public final class ByteOutputStream implements AutoCloseable {
 
     private volatile boolean newSubStream = true;
     private volatile boolean closed = false;
 
-    @Override
-    public void write(final int i) throws IOException {
-      final ByteBuf byteBuf = channel.alloc().ioBuffer(1, 1);
-      byteBuf.writeByte(i);
-      writeByteBuf(byteBuf);
-    }
-
-    @Override
-    public void write(final byte[] bytes, final int offset, final int length) throws IOException {
-      final ByteBuf byteBuf = channel.alloc().ioBuffer(length, length);
-      byteBuf.writeBytes(bytes, offset, length);
-      writeByteBuf(byteBuf);
-    }
-
     /**
      * Writes {@link SerializedPartition}.
      *
@@ -157,13 +147,38 @@ public final class ByteOutputContext extends ByteTransferContext implements Auto
      * @return {@code this}
      * @throws IOException when an exception has been set or this stream was closed
      */
-    public ByteOutputStream writeSerializedPartition(final SerializedPartition serializedPartition)
+    public ByteOutputStream writeSerializedPartitionBuffer(final SerializedPartition serializedPartition)
       throws IOException {
-      write(serializedPartition.getData(), 0, serializedPartition.getLength());
+      writeBuffer(serializedPartition.getDirectBufferList());
       return this;
     }
 
     /**
+     * Wraps each of the {@link ByteBuffer} in the bufList to {@link ByteBuf} object
+     * to write a data frame.
+     *
+     * @param bufList       list of {@link ByteBuffer}s to wrap.
+     * @throws IOException  when fails to write the data.
+     */
+    public void writeBuffer(final List<ByteBuffer> bufList) throws IOException {
+      final ByteBuf byteBuf = wrappedBuffer(bufList.toArray(new ByteBuffer[bufList.size()]));
+      writeByteBuf(byteBuf);
+    }
+
+
+    /**
+     * Writes a data frame, from {@link ByteBuf}.
+     *
+     * @param byteBuf       {@link ByteBuf} to write.
+     * @throws IOException  when fails to write data.
+     */
+    private void writeByteBuf(final ByteBuf byteBuf) throws IOException {
+      if (byteBuf.readableBytes() > 0) {
+        writeDataFrame(byteBuf, byteBuf.readableBytes());
+      }
+    }
+
+    /**
      * Writes a data frame from {@link FileArea}.
      *
      * @param fileArea the {@link FileArea} to transfer
@@ -197,17 +212,6 @@ public final class ByteOutputContext extends ByteTransferContext implements Auto
     }
 
     /**
-     * Writes a data frame, from {@link ByteBuf}.
-     *
-     * @param byteBuf {@link ByteBuf} to write.
-     */
-    private void writeByteBuf(final ByteBuf byteBuf) throws IOException {
-      if (byteBuf.readableBytes() > 0) {
-        writeDataFrame(byteBuf, byteBuf.readableBytes());
-      }
-    }
-
-    /**
      * Write an element to the channel.
      *
      * @param element    element
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java
index 6aed5f1..8806432 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -356,7 +356,7 @@ public final class BlockManagerWorker {
               final Iterable<SerializedPartition> partitions = optionalBlock.get().readSerializedPartitions(keyRange);
               for (final SerializedPartition partition : partitions) {
                 try (ByteOutputContext.ByteOutputStream os = outputContext.newOutputStream()) {
-                  os.writeSerializedPartition(partition);
+                  os.writeSerializedPartitionBuffer(partition);
                 }
               }
             }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java
index 794e8c8..f0362dc 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java
@@ -19,7 +19,8 @@
 package org.apache.nemo.runtime.executor.data;
 
 import com.google.common.io.CountingInputStream;
-import org.apache.nemo.common.DirectByteArrayOutputStream;
+import org.apache.nemo.common.ByteBufferInputStream;
+import org.apache.nemo.common.DirectByteBufferOutputStream;
 import org.apache.nemo.common.coder.DecoderFactory;
 import org.apache.nemo.common.coder.EncoderFactory;
 import org.apache.nemo.runtime.executor.data.partition.NonSerializedPartition;
@@ -31,6 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.*;
+import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -112,8 +114,8 @@ public final class DataUtil {
     final List<SerializedPartition<K>> serializedPartitions = new ArrayList<>();
     for (final NonSerializedPartition<K> partitionToConvert : partitionsToConvert) {
       try (
-        DirectByteArrayOutputStream bytesOutputStream = new DirectByteArrayOutputStream();
-        OutputStream wrappedStream = buildOutputStream(bytesOutputStream, serializer.getEncodeStreamChainers());
+        DirectByteBufferOutputStream bytesOutputStream = new DirectByteBufferOutputStream();
+        OutputStream wrappedStream = buildOutputStream(bytesOutputStream, serializer.getEncodeStreamChainers())
       ) {
         serializePartition(serializer.getEncoderFactory(), partitionToConvert, wrappedStream);
         // We need to close wrappedStream on here, because DirectByteArrayOutputStream:getBufDirectly() returns
@@ -121,10 +123,10 @@ public final class DataUtil {
         wrappedStream.close();
         // Note that serializedBytes include invalid bytes.
         // So we have to use it with the actualLength by using size() whenever needed.
-        final byte[] serializedBytes = bytesOutputStream.getBufDirectly();
+        final List<ByteBuffer> serializedBufList = bytesOutputStream.getDirectByteBufferList();
         final int actualLength = bytesOutputStream.size();
         serializedPartitions.add(
-          new SerializedPartition<>(partitionToConvert.getKey(), serializedBytes, actualLength));
+          new SerializedPartition<>(partitionToConvert.getKey(), serializedBufList, actualLength));
       }
     }
     return serializedPartitions;
@@ -146,12 +148,12 @@ public final class DataUtil {
     final List<NonSerializedPartition<K>> nonSerializedPartitions = new ArrayList<>();
     for (final SerializedPartition<K> partitionToConvert : partitionsToConvert) {
       final K key = partitionToConvert.getKey();
+      try (InputStream inputStream = partitionToConvert.isOffheap()
+        ? new ByteBufferInputStream(partitionToConvert.getDirectBufferList())
+        : new ByteArrayInputStream(partitionToConvert.getData())) {
 
-
-      try (ByteArrayInputStream byteArrayInputStream =
-             new ByteArrayInputStream(partitionToConvert.getData())) {
         final NonSerializedPartition<K> deserializePartition = deserializePartition(
-          partitionToConvert.getLength(), serializer, key, byteArrayInputStream);
+          partitionToConvert.getLength(), serializer, key, inputStream);
         nonSerializedPartitions.add(deserializePartition);
       }
     }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/FileBlock.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/FileBlock.java
index cac065c..65b167b 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/FileBlock.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/FileBlock.java
@@ -34,10 +34,23 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.NotThreadSafe;
-import java.io.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.nio.file.Paths;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 
 /**
  * This class represents a block which is stored in (local or remote) file.
@@ -83,11 +96,13 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
    */
   private void writeToFile(final Iterable<SerializedPartition<K>> serializedPartitions)
     throws IOException {
-    try (FileOutputStream fileOutputStream = new FileOutputStream(filePath, true)) {
+    try (FileChannel fileOutputChannel = new FileOutputStream(filePath, true).getChannel()) {
       for (final SerializedPartition<K> serializedPartition : serializedPartitions) {
         // Reserve a partition write and get the metadata.
         metadata.writePartitionMetadata(serializedPartition.getKey(), serializedPartition.getLength());
-        fileOutputStream.write(serializedPartition.getData(), 0, serializedPartition.getLength());
+        for (final ByteBuffer buffer: serializedPartition.getDirectBufferList()) {
+          fileOutputChannel.write(buffer);
+        }
       }
     }
   }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partition/SerializedPartition.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partition/SerializedPartition.java
index e2f57a5..070b618 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partition/SerializedPartition.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partition/SerializedPartition.java
@@ -18,7 +18,7 @@
  */
 package org.apache.nemo.runtime.executor.data.partition;
 
-import org.apache.nemo.common.DirectByteArrayOutputStream;
+import org.apache.nemo.common.DirectByteBufferOutputStream;
 import org.apache.nemo.common.coder.EncoderFactory;
 import org.apache.nemo.runtime.executor.data.streamchainer.Serializer;
 import org.slf4j.Logger;
@@ -27,12 +27,17 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 
 import static org.apache.nemo.runtime.executor.data.DataUtil.buildOutputStream;
 
 /**
  * A collection of data elements. The data is stored as an array of bytes.
  * This is a unit of read / write towards {@link org.apache.nemo.runtime.executor.data.block.Block}s.
+ * Releasing the memory(either off-heap or on-heap) occurs on block deletion.
+ * TODO #396: Refactoring SerializedPartition into multiple classes
  *
  * @param <K> the key type of its partitions.
  */
@@ -45,11 +50,13 @@ public final class SerializedPartition<K> implements Partition<byte[], K> {
   private volatile boolean committed;
   // Will be null when the partition is committed when it is constructed.
   @Nullable
-  private final DirectByteArrayOutputStream bytesOutputStream;
+  private final DirectByteBufferOutputStream bytesOutputStream;
   @Nullable
   private final OutputStream wrappedStream;
   @Nullable
   private final EncoderFactory.Encoder encoder;
+  private volatile List<ByteBuffer> dataList;
+  private final boolean offheap;
 
   /**
    * Creates a serialized {@link Partition} without actual data.
@@ -65,13 +72,14 @@ public final class SerializedPartition<K> implements Partition<byte[], K> {
     this.serializedData = new byte[0];
     this.length = 0;
     this.committed = false;
-    this.bytesOutputStream = new DirectByteArrayOutputStream();
+    this.bytesOutputStream = new DirectByteBufferOutputStream();
     this.wrappedStream = buildOutputStream(bytesOutputStream, serializer.getEncodeStreamChainers());
     this.encoder = serializer.getEncoderFactory().create(wrappedStream);
+    this.offheap = true;
   }
 
   /**
-   * Creates a serialized {@link Partition} with actual data.
+   * Creates a serialized {@link Partition} with actual data residing in on-heap region.
    * Data cannot be written to this partition after the construction.
    *
    * @param key            the key.
@@ -88,6 +96,28 @@ public final class SerializedPartition<K> implements Partition<byte[], K> {
     this.bytesOutputStream = null;
     this.wrappedStream = null;
     this.encoder = null;
+    this.offheap = false;
+  }
+
+  /**
+   * Creates a serialized {@link Partition} with actual data residing in off-heap region.
+   * Data cannot be written to this partition after the construction.
+   *
+   * @param key               the key.
+   * @param serializedBufList the serialized data in list list of {@link ByteBuffer}s.
+   * @param length            the length of the actual serialized data. (It can be different with serializedData.length)
+   */
+  public SerializedPartition(final K key,
+                             final List<ByteBuffer> serializedBufList,
+                             final int length) {
+    this.key = key;
+    this.dataList = serializedBufList;
+    this.length = length;
+    this.committed = true;
+    this.bytesOutputStream = null;
+    this.wrappedStream = null;
+    this.encoder = null;
+    this.offheap = true;
   }
 
   /**
@@ -120,8 +150,7 @@ public final class SerializedPartition<K> implements Partition<byte[], K> {
       // We need to close wrappedStream on here, because DirectByteArrayOutputStream:getBufDirectly() returns
       // inner buffer directly, which can be an unfinished(not flushed) buffer.
       wrappedStream.close();
-      this.serializedData = bytesOutputStream.getBufDirectly();
-
+      this.dataList = bytesOutputStream.getDirectByteBufferList();
       this.length = bytesOutputStream.size();
       this.committed = true;
     }
@@ -144,6 +173,8 @@ public final class SerializedPartition<K> implements Partition<byte[], K> {
   }
 
   /**
+   * This method should only be used when this partition is residing in on-heap region.
+   *
    * @return the serialized data.
    * @throws IOException if the partition is not committed yet.
    */
@@ -151,12 +182,33 @@ public final class SerializedPartition<K> implements Partition<byte[], K> {
   public byte[] getData() throws IOException {
     if (!committed) {
       throw new IOException("The partition is not committed yet!");
+    } else if (offheap) {
+      throw new RuntimeException("This partition does not have on-heap data");
     } else {
       return serializedData;
     }
   }
 
   /**
+   * This method is used to emit the output as {@link SerializedPartition}.
+   *
+   * @return the serialized data in list of {@link ByteBuffer}s
+   * @throws IOException if the partition is not committed yet.
+   */
+  public List<ByteBuffer> getDirectBufferList() throws IOException {
+    if (!committed) {
+      throw new IOException("The partition is not committed yet!");
+    } else {
+      List<ByteBuffer> result = new ArrayList<>(dataList.size());
+      for (final ByteBuffer buffer : dataList) {
+        final ByteBuffer dupBuffer = buffer.duplicate();
+        result.add(dupBuffer);
+      }
+      return result;
+    }
+  }
+
+  /**
    * @return the length of the actual data.
    * @throws IOException if the partition is not committed yet.
    */
@@ -167,4 +219,11 @@ public final class SerializedPartition<K> implements Partition<byte[], K> {
       return length;
     }
   }
+
+  /**
+   * @return whether this {@code SerializedPartition} is residing in off-heap region.
+   */
+  public boolean isOffheap() {
+    return offheap;
+  }
 }