You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2019/02/06 16:46:00 UTC

[jira] [Resolved] (SPARK-26734) StackOverflowError on WAL serialization caused by large receivedBlockQueue

     [ https://issues.apache.org/jira/browse/SPARK-26734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean Owen resolved SPARK-26734.
-------------------------------
       Resolution: Fixed
    Fix Version/s: 2.3.4
                   2.4.1
                   3.0.0

Issue resolved by pull request 23716
[https://github.com/apache/spark/pull/23716]

> StackOverflowError on WAL serialization caused by large receivedBlockQueue
> --------------------------------------------------------------------------
>
>                 Key: SPARK-26734
>                 URL: https://issues.apache.org/jira/browse/SPARK-26734
>             Project: Spark
>          Issue Type: Bug
>          Components: Block Manager, DStreams
>    Affects Versions: 2.3.1, 2.3.2, 2.4.0
>         Environment: spark 2.4.0 streaming job
> java 1.8
> scala 2.11.12
>            Reporter: Ross M. Lodge
>            Assignee: Ross M. Lodge
>            Priority: Major
>             Fix For: 3.0.0, 2.4.1, 2.3.4
>
>
> We encountered an intermittent StackOverflowError with a stack trace similar to:
>  
> {noformat}
> Exception in thread "JobGenerator" java.lang.StackOverflowError
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509){noformat}
> The name of the thread has been seen to be either "JobGenerator" or "streaming-start", depending on when in the lifecycle of the job the problem occurs.  It appears to only occur in streaming jobs with checkpointing and WAL enabled; this has prevented us from upgrading to v2.4.0.
>  
> Via debugging, we tracked this down to allocateBlocksToBatch in ReceivedBlockTracker:
> {code:java}
> /**
>  * Allocate all unallocated blocks to the given batch.
>  * This event will get written to the write ahead log (if enabled).
>  */
> def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
>   if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
>     val streamIdToBlocks = streamIds.map { streamId =>
>       (streamId, getReceivedBlockQueue(streamId).clone())
>     }.toMap
>     val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
>     if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
>       streamIds.foreach(getReceivedBlockQueue(_).clear())
>       timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
>       lastAllocatedBatchTime = batchTime
>     } else {
>       logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
>     }
>   } else {
>     // This situation occurs when:
>     // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
>     // possibly processed batch job or half-processed batch job need to be processed again,
>     // so the batchTime will be equal to lastAllocatedBatchTime.
>     // 2. Slow checkpointing makes recovered batch time older than WAL recovered
>     // lastAllocatedBatchTime.
>     // This situation will only occurs in recovery time.
>     logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
>   }
> }
> {code}
> Prior to 2.3.1, this code did
> {code:java}
> getReceivedBlockQueue(streamId).dequeueAll(x => true){code}
> but it was changed as part of SPARK-23991 to
> {code:java}
> getReceivedBlockQueue(streamId).clone(){code}
> We've not been able to reproduce this in a test of the actual above method, but we've been able to produce a test that reproduces it by putting a lot of values into the queue:
>  
> {code:java}
> class SerializationFailureTest extends FunSpec {
>   private val logger = LoggerFactory.getLogger(getClass)
>   private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
>   describe("Queue") {
>     it("should be serializable") {
>       runTest(1062)
>     }
>     it("should not be serializable") {
>       runTest(1063)
>     }
>     it("should DEFINITELY not be serializable") {
>       runTest(199952)
>     }
>   }
>   private def runTest(mx: Int): Array[Byte] = {
>     try {
>       val random = new scala.util.Random()
>       val queue = new ReceivedBlockQueue()
>       for (_ <- 0 until mx) {
>         queue += ReceivedBlockInfo(
>           streamId = 0,
>           numRecords = Some(random.nextInt(5)),
>           metadataOption = None,
>           blockStoreResult = WriteAheadLogBasedStoreResult(
>             blockId = StreamBlockId(0, random.nextInt()),
>             numRecords = Some(random.nextInt(5)),
>             walRecordHandle = FileBasedWriteAheadLogSegment(
>               path = s"""hdfs://foo.bar.com:8080/spark/streaming/BAZ/00007/receivedData/0/log-${random.nextInt()}-${random.nextInt()}""",
>               offset = random.nextLong(),
>               length = random.nextInt()
>             )
>           )
>         )
>       }
>       val record = BatchAllocationEvent(
>         Time(1548320400000L), AllocatedBlocks(
>           Map(
>             0 -> queue
>           )
>         )
>       )
>       Utils.serialize(record)
>     } catch {
>       case t: Throwable =>
>         fail(t)
>     }
>   }
> }
> {code}
> In my tests it seemed like the serialization would fail if there were ~1064 elements in the queue.  I'm _assuming_ that this is actually a scala bug, though I haven't tried reproducing it without the involvement of the spark objects.
> I expect this could be solved by transforming the cloned queue into a different type of Seq.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org