You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by 吴晓菊 <ch...@gmail.com> on 2018/07/05 15:05:11 UTC

loading and release of shuffle blocks, what does the TODO in comment mean?

By looking into the code of ShuffleBlockFetcherIterator, seems it will load
all shuffle blocks needed by one task at one time and release all of them
when task finished.

Please correct me if I'm wrong.

If my understanding is correct, does it mean those shuffle blocks will keep
in memory even though the reading of some of the blocks has been finished?
Is it possible to enhance this part?
I do see TODO in the code comment, not sure if it means the same thing.
Here pasted the code:

val out = new ChunkedByteBufferOutputStream(64 * 1024, ByteBuffer.allocate)
try {
  // Decompress the whole block at once to detect any corruption,
which could increase
  // the memory usage tne potential increase the chance of OOM.
  // TODO: manage the memory used here, and spill it into disk in case of OOM.
  Utils.copyStream(input, out)
  out.close()
  input = out.toChunkedByteBuffer.toInputStream(dispose = true)
} catch {
  case e: IOException =>
    buf.release()
    if (buf.isInstanceOf[FileSegmentManagedBuffer]
      || corruptedBlocks.contains(blockId)) {
      throwFetchFailedException(blockId, address, e)
    } else {
      logWarning(s"got an corrupted block $blockId from $address,
fetch again", e)
      corruptedBlocks += blockId
      fetchRequests += FetchRequest(address, Array((blockId, size)))
      result = null
    }
} finally {
  // TODO: release the buf here to free memory earlier
  originalInput.close()
  in.close()
}


Also, if the shuffle blocks are kept in memory for a long time, it's also
possible to goto oldgen and then has impact on full-gc.

Chrysan Wu
Phone:+86 17717640807