You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ms...@apache.org on 2019/02/15 14:39:36 UTC
[hadoop] branch trunk updated: HDDS-1082. OutOfMemoryError because
of memory leak in KeyInputStream. Contributed by Supratim Deka.
This is an automated email from the ASF dual-hosted git repository.
msingh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9584b47 HDDS-1082. OutOfMemoryError because of memory leak in KeyInputStream. Contributed by Supratim Deka.
9584b47 is described below
commit 9584b47e038bca06672bd946499d3e3ab476a4e7
Author: Mukul Kumar Singh <ms...@apache.org>
AuthorDate: Fri Feb 15 20:09:15 2019 +0530
HDDS-1082. OutOfMemoryError because of memory leak in KeyInputStream. Contributed by Supratim Deka.
---
.../hadoop/hdds/scm/storage/BlockInputStream.java | 119 +++++++++++++++++----
1 file changed, 100 insertions(+), 19 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index 001feda..d53d20e 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.storage;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
@@ -105,8 +106,21 @@ public class BlockInputStream extends InputStream implements Seekable {
throws IOException {
checkOpen();
int available = prepareRead(1);
- return available == EOF ? EOF :
- Byte.toUnsignedInt(buffers.get(bufferIndex).get());
+ int dataout = EOF;
+
+ if (available == EOF) {
+ Preconditions.checkState (buffers == null); //should have released by now, see below
+ } else {
+ dataout = Byte.toUnsignedInt(buffers.get(bufferIndex).get());
+ }
+
+ if (blockStreamEOF()) {
+ // consumer might use getPos to determine EOF,
+ // so release buffers when serving the last byte of data
+ releaseBuffers();
+ }
+
+ return dataout;
}
@Override
@@ -135,15 +149,45 @@ public class BlockInputStream extends InputStream implements Seekable {
while (len > 0) {
int available = prepareRead(len);
if (available == EOF) {
+ Preconditions.checkState(buffers == null); //should have been released by now
return total != 0 ? total : EOF;
}
buffers.get(bufferIndex).get(b, off + total, available);
len -= available;
total += available;
}
+
+ if (blockStreamEOF()) {
+ // smart consumers determine EOF by calling getPos()
+ // so we release buffers when serving the final bytes of data
+ releaseBuffers();
+ }
+
return total;
}
+ /**
+ * Determines if all data in the stream has been consumed
+ *
+ * @return true if EOF, false if more data is available
+ */
+ private boolean blockStreamEOF() {
+ if (buffersHaveData() || chunksRemaining()) {
+ return false;
+ } else {
+ // if there are any chunks, we better be at the last chunk for EOF
+ Preconditions.checkState (((chunks == null) || chunks.isEmpty() ||
+ chunkIndex == (chunks.size() - 1)), "EOF detected, but not at the last chunk");
+ return true;
+ }
+ }
+
+ private void releaseBuffers() {
+ //ashes to ashes, dust to dust
+ buffers = null;
+ bufferIndex = 0;
+ }
+
@Override
public synchronized void close() {
if (xceiverClientManager != null && xceiverClient != null) {
@@ -173,23 +217,11 @@ public class BlockInputStream extends InputStream implements Seekable {
*/
private synchronized int prepareRead(int len) throws IOException {
for (;;) {
- if (chunks == null || chunks.isEmpty()) {
- // This must be an empty key.
- return EOF;
- } else if (buffers == null) {
- // The first read triggers fetching the first chunk.
- readChunkFromContainer();
- } else if (!buffers.isEmpty() &&
- buffers.get(bufferIndex).hasRemaining()) {
- // Data is available from the current buffer.
+ if (buffersHaveData()) {
+ // Data is available from buffers
ByteBuffer bb = buffers.get(bufferIndex);
return len > bb.remaining() ? bb.remaining() : len;
- } else if (!buffers.isEmpty() &&
- !buffers.get(bufferIndex).hasRemaining() &&
- bufferIndex < buffers.size() - 1) {
- // There are additional buffers available.
- ++bufferIndex;
- } else if (chunkIndex < chunks.size() - 1) {
+ } else if (chunksRemaining()) {
// There are additional chunks available.
readChunkFromContainer();
} else {
@@ -199,6 +231,44 @@ public class BlockInputStream extends InputStream implements Seekable {
}
}
+ private boolean buffersHaveData() {
+ boolean hasData = false;
+
+ if (buffers == null || buffers.isEmpty()) {
+ return false;
+ }
+
+ while (bufferIndex < (buffers.size())) {
+ if (buffers.get(bufferIndex).hasRemaining()) {
+ // current buffer has data
+ hasData = true;
+ break;
+ } else {
+ if (buffersRemaining()) {
+ // move to next available buffer
+ ++bufferIndex;
+ Preconditions.checkState (bufferIndex < buffers.size());
+ } else {
+ // no more buffers remaining
+ break;
+ }
+ }
+ }
+
+ return hasData;
+ }
+
+ private boolean buffersRemaining() {
+ return (bufferIndex < (buffers.size() - 1));
+ }
+
+ private boolean chunksRemaining() {
+ if ((chunks == null) || chunks.isEmpty()) {
+ return false;
+ }
+ return (chunkIndex < (chunks.size() - 1));
+ }
+
/**
* Attempts to read the chunk at the specified offset in the chunk list. If
* successful, then the data of the read chunk is saved so that its bytes can
@@ -311,8 +381,19 @@ public class BlockInputStream extends InputStream implements Seekable {
@Override
public synchronized long getPos() throws IOException {
- return chunkIndex == -1 ? 0 :
- chunkOffset[chunkIndex] + buffers.get(bufferIndex).position();
+ if (chunkIndex == -1) {
+ // no data consumed yet, a new stream OR after seek
+ return 0;
+ }
+
+ if (blockStreamEOF()) {
+ // all data consumed, buffers have been released.
+ // get position from the chunk offset and chunk length of last chunk
+ return chunkOffset[chunkIndex] + chunks.get(chunkIndex).getLen();
+ }
+
+ // get position from available buffers of current chunk
+ return chunkOffset[chunkIndex] + buffers.get(bufferIndex).position();
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org