You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Gabor Somogyi (JIRA)" <ji...@apache.org> on 2019/02/01 12:34:00 UTC
[jira] [Updated] (SPARK-26734) StackOverflowError on WAL
serialization caused by large receivedBlockQueue
[ https://issues.apache.org/jira/browse/SPARK-26734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Gabor Somogyi updated SPARK-26734:
----------------------------------
Component/s: DStreams
> StackOverflowError on WAL serialization caused by large receivedBlockQueue
> --------------------------------------------------------------------------
>
> Key: SPARK-26734
> URL: https://issues.apache.org/jira/browse/SPARK-26734
> Project: Spark
> Issue Type: Bug
> Components: Block Manager, DStreams
> Affects Versions: 2.3.1, 2.3.2, 2.4.0
> Environment: spark 2.4.0 streaming job
> java 1.8
> scala 2.11.12
> Reporter: Ross M. Lodge
> Priority: Major
>
> We encountered an intermittent StackOverflowError with a stack trace similar to:
>
> {noformat}
> Exception in thread "JobGenerator" java.lang.StackOverflowError
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509){noformat}
> The name of the thread has been seen to be either "JobGenerator" or "streaming-start", depending on when in the lifecycle of the job the problem occurs. It appears to only occur in streaming jobs with checkpointing and WAL enabled; this has prevented us from upgrading to v2.4.0.
>
> Via debugging, we tracked this down to allocateBlocksToBatch in ReceivedBlockTracker:
> {code:java}
> /**
> * Allocate all unallocated blocks to the given batch.
> * This event will get written to the write ahead log (if enabled).
> */
> def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
> if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
> val streamIdToBlocks = streamIds.map { streamId =>
> (streamId, getReceivedBlockQueue(streamId).clone())
> }.toMap
> val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
> if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
> streamIds.foreach(getReceivedBlockQueue(_).clear())
> timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
> lastAllocatedBatchTime = batchTime
> } else {
> logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
> }
> } else {
> // This situation occurs when:
> // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
> // possibly processed batch job or half-processed batch job need to be processed again,
> // so the batchTime will be equal to lastAllocatedBatchTime.
> // 2. Slow checkpointing makes recovered batch time older than WAL recovered
> // lastAllocatedBatchTime.
> // This situation will only occurs in recovery time.
> logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
> }
> }
> {code}
> Prior to 2.3.1, this code did
> {code:java}
> getReceivedBlockQueue(streamId).dequeueAll(x => true){code}
> but it was changed as part of SPARK-23991 to
> {code:java}
> getReceivedBlockQueue(streamId).clone(){code}
> We've not been able to reproduce this in a test of the actual above method, but we've been able to produce a test that reproduces it by putting a lot of values into the queue:
>
> {code:java}
> class SerializationFailureTest extends FunSpec {
> private val logger = LoggerFactory.getLogger(getClass)
> private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
> describe("Queue") {
> it("should be serializable") {
> runTest(1062)
> }
> it("should not be serializable") {
> runTest(1063)
> }
> it("should DEFINITELY not be serializable") {
> runTest(199952)
> }
> }
> private def runTest(mx: Int): Array[Byte] = {
> try {
> val random = new scala.util.Random()
> val queue = new ReceivedBlockQueue()
> for (_ <- 0 until mx) {
> queue += ReceivedBlockInfo(
> streamId = 0,
> numRecords = Some(random.nextInt(5)),
> metadataOption = None,
> blockStoreResult = WriteAheadLogBasedStoreResult(
> blockId = StreamBlockId(0, random.nextInt()),
> numRecords = Some(random.nextInt(5)),
> walRecordHandle = FileBasedWriteAheadLogSegment(
> path = s"""hdfs://foo.bar.com:8080/spark/streaming/BAZ/00007/receivedData/0/log-${random.nextInt()}-${random.nextInt()}""",
> offset = random.nextLong(),
> length = random.nextInt()
> )
> )
> )
> }
> val record = BatchAllocationEvent(
> Time(1548320400000L), AllocatedBlocks(
> Map(
> 0 -> queue
> )
> )
> )
> Utils.serialize(record)
> } catch {
> case t: Throwable =>
> fail(t)
> }
> }
> }
> {code}
> In my tests it seemed like the serialization would fail if there were ~1064 elements in the queue. I'm _assuming_ that this is actually a scala bug, though I haven't tried reproducing it without the involvement of the spark objects.
> I expect this could be solved by transforming the cloned queue into a different type of Seq.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org