You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2019/06/25 06:29:54 UTC
[incubator-nemo] branch master updated: [NEMO-350] Implement
Off-heap SerializedMemoryStore & [NEMO-384] Implement
DirectByteBufferInputStream for Off-heap SerializedMemoryStore (#222)
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new ea448db [NEMO-350] Implement Off-heap SerializedMemoryStore & [NEMO-384] Implement DirectByteBufferInputStream for Off-heap SerializedMemoryStore (#222)
ea448db is described below
commit ea448dbca2ba46d27716b6d55f9072be5aaa4eec
Author: Haeyoon Cho <ch...@gmail.com>
AuthorDate: Tue Jun 25 15:29:49 2019 +0900
[NEMO-350] Implement Off-heap SerializedMemoryStore & [NEMO-384] Implement DirectByteBufferInputStream for Off-heap SerializedMemoryStore (#222)
JIRA: [NEMO-350: Implement Off-heap SerializedMemoryStore](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-#350)
[NEMO-384: Implement DirectByteBufferInputStream for Off-heap SerializedMemoryStore](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-#384)
**Major changes:**
- When a block is emitted by an executor, we write it directly to off-heap memory using `DirectByteBufferOutputStream` and `DirectByteBufferOutputStream`.
**Minor changes to note:**
- `getData()` and `getBuffer` should be distinguished when acquiring data in `SerializedPartition`
**Other comments:**
- This implementation does not ensure performance gain since the overhead of `allocateDirect` (malloc) surpasses the garbage collection overhead. For this reason, memory management is being implemented.
---
.../apache/nemo/common/ByteBufferInputStream.java | 75 ++++++++++++++++++++++
.../nemo/common/DirectByteArrayOutputStream.java | 55 ----------------
.../nemo/common/DirectByteBufferOutputStream.java | 66 ++++++++++++-------
.../nemo/common/coder/BytesDecoderFactory.java | 4 +-
.../common/DirectByteBufferOutputStreamTest.java | 4 +-
.../executor/bytetransfer/ByteOutputContext.java | 60 +++++++++--------
.../runtime/executor/data/BlockManagerWorker.java | 2 +-
.../nemo/runtime/executor/data/DataUtil.java | 20 +++---
.../runtime/executor/data/block/FileBlock.java | 23 +++++--
.../data/partition/SerializedPartition.java | 71 ++++++++++++++++++--
10 files changed, 249 insertions(+), 131 deletions(-)
diff --git a/common/src/main/java/org/apache/nemo/common/ByteBufferInputStream.java b/common/src/main/java/org/apache/nemo/common/ByteBufferInputStream.java
new file mode 100644
index 0000000..9e07c7f
--- /dev/null
+++ b/common/src/main/java/org/apache/nemo/common/ByteBufferInputStream.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * This class is a customized input stream implementation which reads data from
+ * list of {@link ByteBuffer}. If the {@link ByteBuffer} is direct, it may reside outside
+ * the normal garbage-collected heap memory.
+ */
+public class ByteBufferInputStream extends InputStream {
+ private List<ByteBuffer> bufList;
+ private int current = 0;
+ private static final int BITMASK = 0xff;
+
+ /**
+ * Default Constructor.
+ *
+ * @param bufList is the target data to read.
+ */
+ public ByteBufferInputStream(final List<ByteBuffer> bufList) {
+ this.bufList = bufList;
+ }
+
+ /**
+ * Reads data from the list of {@code ByteBuffer}s.
+ *
+ * @return integer.
+ * @throws IOException
+ */
+ @Override
+ public int read() throws IOException {
+ // Since java's byte is signed type, we have to mask it to make byte
+ // become unsigned type to properly retrieve `int` from sequence of bytes.
+ return getBuffer().get() & BITMASK;
+ }
+
+ /**
+ * Return next non-empty @code{ByteBuffer}.
+ *
+ * @return @code{ByteBuffer} to write the data
+ * @throws IOException when fail to retrieve buffer.
+ */
+ public ByteBuffer getBuffer() throws IOException {
+ while (current < bufList.size()) {
+ ByteBuffer buffer = bufList.get(current);
+ if (buffer.hasRemaining()) {
+ return buffer;
+ }
+ current += 1;
+ }
+ throw new EOFException();
+ }
+}
diff --git a/common/src/main/java/org/apache/nemo/common/DirectByteArrayOutputStream.java b/common/src/main/java/org/apache/nemo/common/DirectByteArrayOutputStream.java
deleted file mode 100644
index 9f41e5d..0000000
--- a/common/src/main/java/org/apache/nemo/common/DirectByteArrayOutputStream.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.common;
-
-import java.io.ByteArrayOutputStream;
-
-/**
- * This class represents a custom implementation of {@link ByteArrayOutputStream},
- * which enables to get bytes buffer directly (without memory copy).
- * TODO #370: Substitute ByteArrayOutputStream with java.nio.ByteBuffer
- */
-public final class DirectByteArrayOutputStream extends ByteArrayOutputStream {
-
- /**
- * Default constructor.
- */
- public DirectByteArrayOutputStream() {
- super();
- }
-
- /**
- * Constructor specifying the size.
- *
- * @param size the initial size.
- */
- public DirectByteArrayOutputStream(final int size) {
- super(size);
- }
-
- /**
- * Note that serializedBytes include invalid bytes.
- * So we have to use it with the actualLength by using size() whenever needed.
- *
- * @return the buffer where data is stored.
- */
- public byte[] getBufDirectly() {
- return buf;
- }
-}
diff --git a/common/src/main/java/org/apache/nemo/common/DirectByteBufferOutputStream.java b/common/src/main/java/org/apache/nemo/common/DirectByteBufferOutputStream.java
index 7d96dfa..8a74fb2 100644
--- a/common/src/main/java/org/apache/nemo/common/DirectByteBufferOutputStream.java
+++ b/common/src/main/java/org/apache/nemo/common/DirectByteBufferOutputStream.java
@@ -18,8 +18,11 @@
*/
package org.apache.nemo.common;
+import com.google.common.annotations.VisibleForTesting;
+
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@@ -27,15 +30,17 @@ import java.util.List;
* This class is a customized output stream implementation backed by
* {@link ByteBuffer}, which utilizes off heap memory when writing the data.
* Memory is allocated when needed by the specified {@code pageSize}.
+ * Deletion of {@code dataList}, which is the memory this outputstream holds, occurs
+ * when the corresponding block is deleted.
+ * TODO #388: Off-heap memory management (reuse ByteBuffer) - implement reuse.
*/
public final class DirectByteBufferOutputStream extends OutputStream {
private LinkedList<ByteBuffer> dataList = new LinkedList<>();
- private static final int DEFAULT_PAGE_SIZE = 4096;
+ private static final int DEFAULT_PAGE_SIZE = 32768; //32KB
private final int pageSize;
private ByteBuffer currentBuf;
-
/**
* Default constructor.
* Sets the {@code pageSize} as default size of 4096 bytes.
@@ -45,8 +50,9 @@ public final class DirectByteBufferOutputStream extends OutputStream {
}
/**
- * Constructor specifying the {@code size}.
- * Sets the {@code pageSize} as {@code size}.
+ * Constructor which sets {@code pageSize} as specified {@code size}.
+ * Note that the {@code pageSize} has trade-off between memory fragmentation and
+ * native memory (de)allocation overhead.
*
* @param size should be a power of 2 and greater than or equal to 4096.
*/
@@ -62,6 +68,7 @@ public final class DirectByteBufferOutputStream extends OutputStream {
/**
* Allocates new {@link ByteBuffer} with the capacity equal to {@code pageSize}.
*/
+ // TODO #388: Off-heap memory management (reuse ByteBuffer)
private void newLastBuffer() {
dataList.addLast(ByteBuffer.allocateDirect(pageSize));
}
@@ -120,14 +127,15 @@ public final class DirectByteBufferOutputStream extends OutputStream {
}
}
+
/**
* Creates a byte array that contains the whole content currently written in this output stream.
- * Note that this method causes array copy which could degrade performance.
- * TODO #384: For performance issue, implement an input stream so that we do not have to use this method.
*
+ * USED BY TESTS ONLY.
* @return the current contents of this output stream, as byte array.
*/
- public byte[] toByteArray() {
+ @VisibleForTesting
+ byte[] toByteArray() {
if (dataList.isEmpty()) {
final byte[] byteArray = new byte[0];
return byteArray;
@@ -140,39 +148,49 @@ public final class DirectByteBufferOutputStream extends OutputStream {
final int arraySize = pageSize * (dataList.size() - 1) + lastBuf.position();
final byte[] byteArray = new byte[arraySize];
int start = 0;
- int byteToWrite;
-
- for (final ByteBuffer temp : dataList) {
- // ByteBuffer has to be shifted to read mode by calling ByteBuffer.flip(),
- // which sets limit to the current position and sets the position to 0.
- // Note that capacity remains unchanged.
- temp.flip();
- byteToWrite = temp.remaining();
- temp.get(byteArray, start, byteToWrite);
+
+ for (final ByteBuffer buffer : dataList) {
+ // We use duplicated buffer to read the data so that there is no complicated
+ // alteration of position and limit when switching between read and write mode.
+ final ByteBuffer dupBuffer = buffer.duplicate();
+ dupBuffer.flip();
+ final int byteToWrite = dupBuffer.remaining();
+ dupBuffer.get(byteArray, start, byteToWrite);
start += byteToWrite;
}
- // The limit of the last buffer has to be set to the capacity for additional write.
- lastBuf.limit(lastBuf.capacity());
return byteArray;
}
/**
* Returns the list of {@code ByteBuffer}s that contains the written data.
- * Note that by calling this method, the existing list of {@code ByteBuffer}s is cleared.
+ * List of flipped and duplicated {@link ByteBuffer}s are returned which has independent
+ * position and limit, to reduce erroneous data read/write.
+ * This function has to be called when intended to read from the start of the list of
+ * {@link ByteBuffer}s, not for additional write.
*
* @return the {@code LinkedList} of {@code ByteBuffer}s.
*/
- public List<ByteBuffer> getBufferListAndClear() {
- List<ByteBuffer> result = dataList;
- dataList = new LinkedList<>();
- for (final ByteBuffer buffer : result) {
- buffer.flip();
+ public List<ByteBuffer> getDirectByteBufferList() {
+ List<ByteBuffer> result = new ArrayList<>(dataList.size());
+ for (final ByteBuffer buffer : dataList) {
+ final ByteBuffer dupBuffer = buffer.duplicate();
+ dupBuffer.flip();
+ result.add(dupBuffer);
}
return result;
}
/**
+ * Returns the size of the data written in this output stream.
+ *
+ * @return the size of the data
+ */
+ public int size() {
+ return pageSize * (dataList.size() - 1) + dataList.getLast().position();
+ }
+
+ /**
* Closing this output stream has no effect.
*/
public void close() {
diff --git a/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java b/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java
index 594d412..847ad8f 100644
--- a/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java
+++ b/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java
@@ -18,10 +18,10 @@
*/
package org.apache.nemo.common.coder;
-import org.apache.nemo.common.DirectByteArrayOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
@@ -82,7 +82,7 @@ public final class BytesDecoderFactory implements DecoderFactory<byte[]> {
public byte[] decode() throws IOException {
// We cannot use inputStream.available() to know the length of bytes to read.
// The available method only returns the number of bytes can be read without blocking.
- final DirectByteArrayOutputStream byteOutputStream = new DirectByteArrayOutputStream();
+ final ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
int b = inputStream.read();
while (b != -1) {
byteOutputStream.write(b);
diff --git a/common/src/test/java/org/apache/nemo/common/DirectByteBufferOutputStreamTest.java b/common/src/test/java/org/apache/nemo/common/DirectByteBufferOutputStreamTest.java
index c4dbfb1..564ec5a 100644
--- a/common/src/test/java/org/apache/nemo/common/DirectByteBufferOutputStreamTest.java
+++ b/common/src/test/java/org/apache/nemo/common/DirectByteBufferOutputStreamTest.java
@@ -97,11 +97,11 @@ public class DirectByteBufferOutputStreamTest {
}
@Test
- public void testGetBufferList() {
+ public void testGetDirectBufferList() {
String value = RandomStringUtils.randomAlphanumeric(10000);
outputStream.write(value.getBytes());
byte[] totalOutput = outputStream.toByteArray();
- List<ByteBuffer> bufList = outputStream.getBufferListAndClear();
+ List<ByteBuffer> bufList = outputStream.getDirectByteBufferList();
int offset = 0;
int byteToRead;
for (final ByteBuffer temp : bufList) {
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
index 019358a..01ba02a 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
@@ -34,9 +34,13 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.List;
+
+import static io.netty.buffer.Unpooled.wrappedBuffer;
/**
* Container for multiple output streams. Represents a transfer context on sender-side.
@@ -131,25 +135,11 @@ public final class ByteOutputContext extends ByteTransferContext implements Auto
* <p>Public methods are thread safe,
* although the execution order may not be linearized if they were called from different threads.</p>
*/
- public final class ByteOutputStream extends OutputStream {
+ public final class ByteOutputStream implements AutoCloseable {
private volatile boolean newSubStream = true;
private volatile boolean closed = false;
- @Override
- public void write(final int i) throws IOException {
- final ByteBuf byteBuf = channel.alloc().ioBuffer(1, 1);
- byteBuf.writeByte(i);
- writeByteBuf(byteBuf);
- }
-
- @Override
- public void write(final byte[] bytes, final int offset, final int length) throws IOException {
- final ByteBuf byteBuf = channel.alloc().ioBuffer(length, length);
- byteBuf.writeBytes(bytes, offset, length);
- writeByteBuf(byteBuf);
- }
-
/**
* Writes {@link SerializedPartition}.
*
@@ -157,13 +147,38 @@ public final class ByteOutputContext extends ByteTransferContext implements Auto
* @return {@code this}
* @throws IOException when an exception has been set or this stream was closed
*/
- public ByteOutputStream writeSerializedPartition(final SerializedPartition serializedPartition)
+ public ByteOutputStream writeSerializedPartitionBuffer(final SerializedPartition serializedPartition)
throws IOException {
- write(serializedPartition.getData(), 0, serializedPartition.getLength());
+ writeBuffer(serializedPartition.getDirectBufferList());
return this;
}
/**
+ * Wraps each of the {@link ByteBuffer} in the bufList to {@link ByteBuf} object
+ * to write a data frame.
+ *
+ * @param bufList list of {@link ByteBuffer}s to wrap.
+ * @throws IOException when fails to write the data.
+ */
+ public void writeBuffer(final List<ByteBuffer> bufList) throws IOException {
+ final ByteBuf byteBuf = wrappedBuffer(bufList.toArray(new ByteBuffer[bufList.size()]));
+ writeByteBuf(byteBuf);
+ }
+
+
+ /**
+ * Writes a data frame, from {@link ByteBuf}.
+ *
+ * @param byteBuf {@link ByteBuf} to write.
+ * @throws IOException when fails to write data.
+ */
+ private void writeByteBuf(final ByteBuf byteBuf) throws IOException {
+ if (byteBuf.readableBytes() > 0) {
+ writeDataFrame(byteBuf, byteBuf.readableBytes());
+ }
+ }
+
+ /**
* Writes a data frame from {@link FileArea}.
*
* @param fileArea the {@link FileArea} to transfer
@@ -197,17 +212,6 @@ public final class ByteOutputContext extends ByteTransferContext implements Auto
}
/**
- * Writes a data frame, from {@link ByteBuf}.
- *
- * @param byteBuf {@link ByteBuf} to write.
- */
- private void writeByteBuf(final ByteBuf byteBuf) throws IOException {
- if (byteBuf.readableBytes() > 0) {
- writeDataFrame(byteBuf, byteBuf.readableBytes());
- }
- }
-
- /**
* Write an element to the channel.
*
* @param element element
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java
index 6aed5f1..8806432 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -356,7 +356,7 @@ public final class BlockManagerWorker {
final Iterable<SerializedPartition> partitions = optionalBlock.get().readSerializedPartitions(keyRange);
for (final SerializedPartition partition : partitions) {
try (ByteOutputContext.ByteOutputStream os = outputContext.newOutputStream()) {
- os.writeSerializedPartition(partition);
+ os.writeSerializedPartitionBuffer(partition);
}
}
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java
index 794e8c8..f0362dc 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java
@@ -19,7 +19,8 @@
package org.apache.nemo.runtime.executor.data;
import com.google.common.io.CountingInputStream;
-import org.apache.nemo.common.DirectByteArrayOutputStream;
+import org.apache.nemo.common.ByteBufferInputStream;
+import org.apache.nemo.common.DirectByteBufferOutputStream;
import org.apache.nemo.common.coder.DecoderFactory;
import org.apache.nemo.common.coder.EncoderFactory;
import org.apache.nemo.runtime.executor.data.partition.NonSerializedPartition;
@@ -31,6 +32,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
+import java.nio.ByteBuffer;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -112,8 +114,8 @@ public final class DataUtil {
final List<SerializedPartition<K>> serializedPartitions = new ArrayList<>();
for (final NonSerializedPartition<K> partitionToConvert : partitionsToConvert) {
try (
- DirectByteArrayOutputStream bytesOutputStream = new DirectByteArrayOutputStream();
- OutputStream wrappedStream = buildOutputStream(bytesOutputStream, serializer.getEncodeStreamChainers());
+ DirectByteBufferOutputStream bytesOutputStream = new DirectByteBufferOutputStream();
+ OutputStream wrappedStream = buildOutputStream(bytesOutputStream, serializer.getEncodeStreamChainers())
) {
serializePartition(serializer.getEncoderFactory(), partitionToConvert, wrappedStream);
// We need to close wrappedStream on here, because DirectByteArrayOutputStream:getBufDirectly() returns
@@ -121,10 +123,10 @@ public final class DataUtil {
wrappedStream.close();
// Note that serializedBytes include invalid bytes.
// So we have to use it with the actualLength by using size() whenever needed.
- final byte[] serializedBytes = bytesOutputStream.getBufDirectly();
+ final List<ByteBuffer> serializedBufList = bytesOutputStream.getDirectByteBufferList();
final int actualLength = bytesOutputStream.size();
serializedPartitions.add(
- new SerializedPartition<>(partitionToConvert.getKey(), serializedBytes, actualLength));
+ new SerializedPartition<>(partitionToConvert.getKey(), serializedBufList, actualLength));
}
}
return serializedPartitions;
@@ -146,12 +148,12 @@ public final class DataUtil {
final List<NonSerializedPartition<K>> nonSerializedPartitions = new ArrayList<>();
for (final SerializedPartition<K> partitionToConvert : partitionsToConvert) {
final K key = partitionToConvert.getKey();
+ try (InputStream inputStream = partitionToConvert.isOffheap()
+ ? new ByteBufferInputStream(partitionToConvert.getDirectBufferList())
+ : new ByteArrayInputStream(partitionToConvert.getData())) {
-
- try (ByteArrayInputStream byteArrayInputStream =
- new ByteArrayInputStream(partitionToConvert.getData())) {
final NonSerializedPartition<K> deserializePartition = deserializePartition(
- partitionToConvert.getLength(), serializer, key, byteArrayInputStream);
+ partitionToConvert.getLength(), serializer, key, inputStream);
nonSerializedPartitions.add(deserializePartition);
}
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/FileBlock.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/FileBlock.java
index cac065c..65b167b 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/FileBlock.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/FileBlock.java
@@ -34,10 +34,23 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.NotThreadSafe;
-import java.io.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Paths;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
/**
* This class represents a block which is stored in (local or remote) file.
@@ -83,11 +96,13 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
*/
private void writeToFile(final Iterable<SerializedPartition<K>> serializedPartitions)
throws IOException {
- try (FileOutputStream fileOutputStream = new FileOutputStream(filePath, true)) {
+ try (FileChannel fileOutputChannel = new FileOutputStream(filePath, true).getChannel()) {
for (final SerializedPartition<K> serializedPartition : serializedPartitions) {
// Reserve a partition write and get the metadata.
metadata.writePartitionMetadata(serializedPartition.getKey(), serializedPartition.getLength());
- fileOutputStream.write(serializedPartition.getData(), 0, serializedPartition.getLength());
+ for (final ByteBuffer buffer: serializedPartition.getDirectBufferList()) {
+ fileOutputChannel.write(buffer);
+ }
}
}
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partition/SerializedPartition.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partition/SerializedPartition.java
index e2f57a5..070b618 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partition/SerializedPartition.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partition/SerializedPartition.java
@@ -18,7 +18,7 @@
*/
package org.apache.nemo.runtime.executor.data.partition;
-import org.apache.nemo.common.DirectByteArrayOutputStream;
+import org.apache.nemo.common.DirectByteBufferOutputStream;
import org.apache.nemo.common.coder.EncoderFactory;
import org.apache.nemo.runtime.executor.data.streamchainer.Serializer;
import org.slf4j.Logger;
@@ -27,12 +27,17 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import static org.apache.nemo.runtime.executor.data.DataUtil.buildOutputStream;
/**
* A collection of data elements. The data is stored as an array of bytes.
* This is a unit of read / write towards {@link org.apache.nemo.runtime.executor.data.block.Block}s.
+ * Releasing the memory(either off-heap or on-heap) occurs on block deletion.
+ * TODO #396: Refactoring SerializedPartition into multiple classes
*
* @param <K> the key type of its partitions.
*/
@@ -45,11 +50,13 @@ public final class SerializedPartition<K> implements Partition<byte[], K> {
private volatile boolean committed;
// Will be null when the partition is committed when it is constructed.
@Nullable
- private final DirectByteArrayOutputStream bytesOutputStream;
+ private final DirectByteBufferOutputStream bytesOutputStream;
@Nullable
private final OutputStream wrappedStream;
@Nullable
private final EncoderFactory.Encoder encoder;
+ private volatile List<ByteBuffer> dataList;
+ private final boolean offheap;
/**
* Creates a serialized {@link Partition} without actual data.
@@ -65,13 +72,14 @@ public final class SerializedPartition<K> implements Partition<byte[], K> {
this.serializedData = new byte[0];
this.length = 0;
this.committed = false;
- this.bytesOutputStream = new DirectByteArrayOutputStream();
+ this.bytesOutputStream = new DirectByteBufferOutputStream();
this.wrappedStream = buildOutputStream(bytesOutputStream, serializer.getEncodeStreamChainers());
this.encoder = serializer.getEncoderFactory().create(wrappedStream);
+ this.offheap = true;
}
/**
- * Creates a serialized {@link Partition} with actual data.
+ * Creates a serialized {@link Partition} with actual data residing in on-heap region.
* Data cannot be written to this partition after the construction.
*
* @param key the key.
@@ -88,6 +96,28 @@ public final class SerializedPartition<K> implements Partition<byte[], K> {
this.bytesOutputStream = null;
this.wrappedStream = null;
this.encoder = null;
+ this.offheap = false;
+ }
+
+ /**
+ * Creates a serialized {@link Partition} with actual data residing in off-heap region.
+ * Data cannot be written to this partition after the construction.
+ *
+ * @param key the key.
+ * @param serializedBufList the serialized data in list list of {@link ByteBuffer}s.
+ * @param length the length of the actual serialized data. (It can be different with serializedData.length)
+ */
+ public SerializedPartition(final K key,
+ final List<ByteBuffer> serializedBufList,
+ final int length) {
+ this.key = key;
+ this.dataList = serializedBufList;
+ this.length = length;
+ this.committed = true;
+ this.bytesOutputStream = null;
+ this.wrappedStream = null;
+ this.encoder = null;
+ this.offheap = true;
}
/**
@@ -120,8 +150,7 @@ public final class SerializedPartition<K> implements Partition<byte[], K> {
// We need to close wrappedStream on here, because DirectByteArrayOutputStream:getBufDirectly() returns
// inner buffer directly, which can be an unfinished(not flushed) buffer.
wrappedStream.close();
- this.serializedData = bytesOutputStream.getBufDirectly();
-
+ this.dataList = bytesOutputStream.getDirectByteBufferList();
this.length = bytesOutputStream.size();
this.committed = true;
}
@@ -144,6 +173,8 @@ public final class SerializedPartition<K> implements Partition<byte[], K> {
}
/**
+ * This method should only be used when this partition is residing in on-heap region.
+ *
* @return the serialized data.
* @throws IOException if the partition is not committed yet.
*/
@@ -151,12 +182,33 @@ public final class SerializedPartition<K> implements Partition<byte[], K> {
public byte[] getData() throws IOException {
if (!committed) {
throw new IOException("The partition is not committed yet!");
+ } else if (offheap) {
+ throw new RuntimeException("This partition does not have on-heap data");
} else {
return serializedData;
}
}
/**
+ * This method is used to emit the output as {@link SerializedPartition}.
+ *
+ * @return the serialized data in list of {@link ByteBuffer}s
+ * @throws IOException if the partition is not committed yet.
+ */
+ public List<ByteBuffer> getDirectBufferList() throws IOException {
+ if (!committed) {
+ throw new IOException("The partition is not committed yet!");
+ } else {
+ List<ByteBuffer> result = new ArrayList<>(dataList.size());
+ for (final ByteBuffer buffer : dataList) {
+ final ByteBuffer dupBuffer = buffer.duplicate();
+ result.add(dupBuffer);
+ }
+ return result;
+ }
+ }
+
+ /**
* @return the length of the actual data.
* @throws IOException if the partition is not committed yet.
*/
@@ -167,4 +219,11 @@ public final class SerializedPartition<K> implements Partition<byte[], K> {
return length;
}
}
+
+ /**
+ * @return whether this {@code SerializedPartition} is residing in off-heap region.
+ */
+ public boolean isOffheap() {
+ return offheap;
+ }
}