You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "JoshRosen (via GitHub)" <gi...@apache.org> on 2023/08/31 20:14:51 UTC

[GitHub] [spark] JoshRosen commented on pull request #42742: [SPARK-45025] Allow block manager memory store iterator to handle thread interrupt and perform task completion gracefully

JoshRosen commented on PR #42742:
URL: https://github.com/apache/spark/pull/42742#issuecomment-1701720571

   > So it seems freeing the memory is not a problem, but if we return the InterruptedException, we would still be risking leaking the direct buffers, since we won't get a chance to register this task completion listener ? Do you think this is safe/handled elsewhere in the caller when the exception is received ?
   
   If I understand correctly, I think this might be a pre-existing risk that we're making worse: there's nothing that prevented the old code from throwing arbitrary exceptions when computing the iterator elements.
   
   I wonder whether we should aim to fix that pre-existing bug at a higher level. In `putIteratorAsBytes`, we have 
   
   https://github.com/apache/spark/blob/e72ce91250a9a2c40fd5ed55a50dbc46e4e7e46d/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala#L349-L367
   
   I'm wondering whether we can restructure that code in order to wrap the `putIterator` call and dispose of the `valuesHolder` in case `putIterator` fails. Something along these lines (borrowing some code and comments from elsewhere in this part of Spark):
   
   ```scala
      val putIteratorResult = Utils.tryWithSafeFinallyAndFailureCallbacks {
         putIterator(blockId, values, classTag, memoryMode, valuesHolder)
       }(catchBlock = {
         // We want to close the output stream in order to free any resources associated with the
         // serializer itself (such as Kryo's internal buffers). close() might cause data to be
         // written, so redirect the output stream to discard that data.
         valuesHolder.redirectableOutputStream.setOutputStream(ByteStreams.nullOutputStream())
         valuesHolder.serializationStream.close()
         valuesHolder.bbos.close()
         valuesHolder.bbos.toChunkedByteBuffer.dispose()
       })
   
       putIteratorResult match {
         case Right(storedSize) => Right(storedSize)
         case Left(unrollMemoryUsedByThisBlock) =>
           Left(new PartiallySerializedBlock(
             this,
             serializerManager,
             blockId,
             valuesHolder.serializationStream,
             valuesHolder.redirectableStream,
             unrollMemoryUsedByThisBlock,
             memoryMode,
             valuesHolder.bbos,
             values,
             classTag))
       }
   ```
   
   If the `putIterator()` call fails then the `catchBlock` will try to close the serialization stream and dispose of the block. I used `tryWithSafeFinallyAndFailureCallbacks` because there's non-trivial cleanup work taking place in the catch block and I didn't want that to suppress the original task exception.
   
   I scoped the `try` block to exclude the `new PartiallySerializedBlock` because I wanted to avoid the possibility that two different pieces of cleanup logic (the task completion callback and the catch block) both call `toChunkedByteBuffer()`.
   
   ---
   
   As I look further into the pre-existing code, I'm spotting a couple of other cases where it looks like we're not guaranteed to perform proper cleanup. For example, it looks like we're not guaranteed to close the serialization stream if downstream partial unrolling code fails (or at least it's not straightforwardly obvious that cleanup will happen).
   
   To better test those cases, I think we should add some new unit test cases to `MemoryStoreSuite` to test scenarios where the iterator being stored throws exceptions at various points.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org