You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ms...@apache.org on 2019/02/15 14:39:36 UTC

[hadoop] branch trunk updated: HDDS-1082. OutOfMemoryError because of memory leak in KeyInputStream. Contributed by Supratim Deka.

This is an automated email from the ASF dual-hosted git repository.

msingh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9584b47  HDDS-1082. OutOfMemoryError because of memory leak in KeyInputStream. Contributed by Supratim Deka.
9584b47 is described below

commit 9584b47e038bca06672bd946499d3e3ab476a4e7
Author: Mukul Kumar Singh <ms...@apache.org>
AuthorDate: Fri Feb 15 20:09:15 2019 +0530

    HDDS-1082. OutOfMemoryError because of memory leak in KeyInputStream. Contributed by Supratim Deka.
---
 .../hadoop/hdds/scm/storage/BlockInputStream.java  | 119 +++++++++++++++++----
 1 file changed, 100 insertions(+), 19 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index 001feda..d53d20e 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdds.scm.storage;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.XceiverClientReply;
@@ -105,8 +106,21 @@ public class BlockInputStream extends InputStream implements Seekable {
       throws IOException {
     checkOpen();
     int available = prepareRead(1);
-    return available == EOF ? EOF :
-        Byte.toUnsignedInt(buffers.get(bufferIndex).get());
+    int dataout = EOF;
+
+    if (available == EOF) {
+      Preconditions.checkState (buffers == null); //should have released by now, see below
+    } else {
+      dataout = Byte.toUnsignedInt(buffers.get(bufferIndex).get());
+    }
+
+    if (blockStreamEOF()) {
+      // consumer might use getPos to determine EOF,
+      // so release buffers when serving the last byte of data
+      releaseBuffers();
+    }
+
+    return dataout;
   }
 
   @Override
@@ -135,15 +149,45 @@ public class BlockInputStream extends InputStream implements Seekable {
     while (len > 0) {
       int available = prepareRead(len);
       if (available == EOF) {
+        Preconditions.checkState(buffers == null); //should have been released by now
         return total != 0 ? total : EOF;
       }
       buffers.get(bufferIndex).get(b, off + total, available);
       len -= available;
       total += available;
     }
+
+    if (blockStreamEOF()) {
+      // smart consumers determine EOF by calling getPos()
+      // so we release buffers when serving the final bytes of data
+      releaseBuffers();
+    }
+
     return total;
   }
 
+  /**
+   * Determines if all data in the stream has been consumed
+   *
+   * @return true if EOF, false if more data is available
+   */
+  private boolean blockStreamEOF() {
+    if (buffersHaveData() || chunksRemaining()) {
+      return false;
+    } else {
+      // if there are any chunks, we better be at the last chunk for EOF
+      Preconditions.checkState (((chunks == null) || chunks.isEmpty() ||
+              chunkIndex == (chunks.size() - 1)), "EOF detected, but not at the last chunk");
+      return true;
+    }
+  }
+
+  private void releaseBuffers() {
+    //ashes to ashes, dust to dust
+    buffers = null;
+    bufferIndex = 0;
+  }
+
   @Override
   public synchronized void close() {
     if (xceiverClientManager != null && xceiverClient != null) {
@@ -173,23 +217,11 @@ public class BlockInputStream extends InputStream implements Seekable {
    */
   private synchronized int prepareRead(int len) throws IOException {
     for (;;) {
-      if (chunks == null || chunks.isEmpty()) {
-        // This must be an empty key.
-        return EOF;
-      } else if (buffers == null) {
-        // The first read triggers fetching the first chunk.
-        readChunkFromContainer();
-      } else if (!buffers.isEmpty() &&
-          buffers.get(bufferIndex).hasRemaining()) {
-        // Data is available from the current buffer.
+      if (buffersHaveData()) {
+        // Data is available from buffers
         ByteBuffer bb = buffers.get(bufferIndex);
         return len > bb.remaining() ? bb.remaining() : len;
-      } else if (!buffers.isEmpty() &&
-          !buffers.get(bufferIndex).hasRemaining() &&
-          bufferIndex < buffers.size() - 1) {
-        // There are additional buffers available.
-        ++bufferIndex;
-      } else if (chunkIndex < chunks.size() - 1) {
+      } else if (chunksRemaining()) {
         // There are additional chunks available.
         readChunkFromContainer();
       } else {
@@ -199,6 +231,44 @@ public class BlockInputStream extends InputStream implements Seekable {
     }
   }
 
+  private boolean buffersHaveData() {
+    boolean hasData = false;
+
+    if (buffers == null || buffers.isEmpty()) {
+      return false;
+    }
+
+    while (bufferIndex < (buffers.size())) {
+      if (buffers.get(bufferIndex).hasRemaining()) {
+        // current buffer has data
+        hasData = true;
+        break;
+      } else {
+        if (buffersRemaining()) {
+          // move to next available buffer
+          ++bufferIndex;
+          Preconditions.checkState (bufferIndex < buffers.size());
+        } else {
+          // no more buffers remaining
+          break;
+        }
+      }
+    }
+
+    return hasData;
+  }
+
+  private boolean buffersRemaining() {
+    return (bufferIndex < (buffers.size() - 1));
+  }
+
+  private boolean chunksRemaining() {
+    if ((chunks == null) || chunks.isEmpty()) {
+      return false;
+    }
+    return (chunkIndex < (chunks.size() - 1));
+  }
+
   /**
    * Attempts to read the chunk at the specified offset in the chunk list.  If
    * successful, then the data of the read chunk is saved so that its bytes can
@@ -311,8 +381,19 @@ public class BlockInputStream extends InputStream implements Seekable {
 
   @Override
   public synchronized long getPos() throws IOException {
-    return chunkIndex == -1 ? 0 :
-        chunkOffset[chunkIndex] + buffers.get(bufferIndex).position();
+    if (chunkIndex == -1) {
+      // no data consumed yet, a new stream OR after seek
+      return 0;
+    }
+
+    if (blockStreamEOF()) {
+      // all data consumed, buffers have been released.
+      // get position from the chunk offset and chunk length of last chunk
+      return chunkOffset[chunkIndex] + chunks.get(chunkIndex).getLen();
+    }
+
+    // get position from available buffers of current chunk
+    return chunkOffset[chunkIndex] + buffers.get(bufferIndex).position();
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org