You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Ross M. Lodge (JIRA)" <ji...@apache.org> on 2019/01/26 00:20:00 UTC
[jira] [Created] (SPARK-26734) StackOverflowError on WAL
serialization caused by large receivedBlockQueue
Ross M. Lodge created SPARK-26734:
-------------------------------------
Summary: StackOverflowError on WAL serialization caused by large receivedBlockQueue
Key: SPARK-26734
URL: https://issues.apache.org/jira/browse/SPARK-26734
Project: Spark
Issue Type: Bug
Components: Block Manager
Affects Versions: 2.4.0, 2.3.2, 2.3.1
Environment: spark 2.4.0 streaming job
java 1.8
scala 2.11.12
Reporter: Ross M. Lodge
We encountered an intermittent StackOverflowError with a stack trace similar to:
{noformat}
Exception in thread "JobGenerator" java.lang.StackOverflowError
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509){noformat}
The name of the thread has been seen to be either "JobGenerator" or "streaming-start", depending on when in the lifecycle of the job the problem occurs. It appears to only occur in streaming jobs with checkpointing and WAL enabled; this has prevented us from upgrading to v2.4.0.
Via debugging, we tracked this down to allocateBlocksToBatch in ReceivedBlockTracker:
{code:java}
/**
* Allocate all unallocated blocks to the given batch.
* This event will get written to the write ahead log (if enabled).
*/
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
val streamIdToBlocks = streamIds.map { streamId =>
(streamId, getReceivedBlockQueue(streamId).clone())
}.toMap
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
streamIds.foreach(getReceivedBlockQueue(_).clear())
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime
} else {
logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
}
} else {
// This situation occurs when:
// 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
// possibly processed batch job or half-processed batch job need to be processed again,
// so the batchTime will be equal to lastAllocatedBatchTime.
// 2. Slow checkpointing makes recovered batch time older than WAL recovered
// lastAllocatedBatchTime.
// This situation will only occurs in recovery time.
logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
}
}
{code}
Prior to 2.3.1, this code did
{code:java}
getReceivedBlockQueue(streamId).dequeueAll(x => true){code}
but it was changed as part of SPARK-23991 to
{code:java}
getReceivedBlockQueue(streamId).clone(){code}
We've not been able to reproduce this in a test of the actual above method, but we've been able to produce a test that reproduces it by putting a lot of values into the queue:
{code:java}
class SerializationFailureTest extends FunSpec {
private val logger = LoggerFactory.getLogger(getClass)
private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
describe("Queue") {
it("should be serializable") {
runTest(1062)
}
it("should not be serializable") {
runTest(1063)
}
it("should DEFINITELY not be serializable") {
runTest(199952)
}
}
private def runTest(mx: Int): Array[Byte] = {
try {
val random = new scala.util.Random()
val queue = new ReceivedBlockQueue()
for (_ <- 0 until mx) {
queue += ReceivedBlockInfo(
streamId = 0,
numRecords = Some(random.nextInt(5)),
metadataOption = None,
blockStoreResult = WriteAheadLogBasedStoreResult(
blockId = StreamBlockId(0, random.nextInt()),
numRecords = Some(random.nextInt(5)),
walRecordHandle = FileBasedWriteAheadLogSegment(
path = s"""hdfs://foo.bar.com:8080/spark/streaming/BAZ/00007/receivedData/0/log-${random.nextInt()}-${random.nextInt()}""",
offset = random.nextLong(),
length = random.nextInt()
)
)
)
}
val record = BatchAllocationEvent(
Time(1548320400000L), AllocatedBlocks(
Map(
0 -> queue
)
)
)
Utils.serialize(record)
} catch {
case t: Throwable =>
fail(t)
}
}
}
{code}
In my tests it seemed like the serialization would fail if there were ~1064 elements in the queue. I'm _assuming_ that this is actually a scala bug, though I haven't tried reproducing it without the involvement of the spark objects.
I expect this could be solved by transforming the cloned queue into a different type of Seq.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org