You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by davies <gi...@git.apache.org> on 2016/03/30 22:18:44 UTC

[GitHub] spark pull request: [SPARK-13992] Add support for off-heap caching

Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11805#discussion_r57956249
  
    --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferOutputStream.scala ---
    @@ -67,33 +73,41 @@ class ByteArrayChunkOutputStream(chunkSize: Int) extends OutputStream {
     
       @inline
       private def allocateNewChunkIfNeeded(): Unit = {
    +    require(!toChunkedByteBufferWasCalled, "cannot write after toChunkedByteBuffer() is called")
         if (position == chunkSize) {
    -      chunks += new Array[Byte](chunkSize)
    +      chunks += allocator(chunkSize)
           lastChunkIndex += 1
           position = 0
         }
       }
     
    -  def toArrays: Array[Array[Byte]] = {
    +  def toChunkedByteBuffer: ChunkedByteBuffer = {
    +    require(!toChunkedByteBufferWasCalled, "toChunkedByteBuffer() can only be called once")
    +    toChunkedByteBufferWasCalled = true
         if (lastChunkIndex == -1) {
    -      new Array[Array[Byte]](0)
    +      new ChunkedByteBuffer(Array.empty[ByteBuffer])
         } else {
           // Copy the first n-1 chunks to the output, and then create an array that fits the last chunk.
           // An alternative would have been returning an array of ByteBuffers, with the last buffer
           // bounded to only the last chunk's position. However, given our use case in Spark (to put
           // the chunks in block manager), only limiting the view bound of the buffer would still
           // require the block manager to store the whole chunk.
    -      val ret = new Array[Array[Byte]](chunks.size)
    +      val ret = new Array[ByteBuffer](chunks.size)
           for (i <- 0 until chunks.size - 1) {
             ret(i) = chunks(i)
    +        ret(i).flip()
           }
           if (position == chunkSize) {
             ret(lastChunkIndex) = chunks(lastChunkIndex)
    +        ret(lastChunkIndex).flip()
           } else {
    -        ret(lastChunkIndex) = new Array[Byte](position)
    -        System.arraycopy(chunks(lastChunkIndex), 0, ret(lastChunkIndex), 0, position)
    +        ret(lastChunkIndex) = allocator(position)
    +        chunks(lastChunkIndex).flip()
    +        ret(lastChunkIndex).put(chunks(lastChunkIndex))
    +        ret(lastChunkIndex).flip()
    +        StorageUtils.dispose(chunks(lastChunkIndex))
           }
    -      ret
    +     new ChunkedByteBuffer(ret)
    --- End diff --
    
    indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org