You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/02/06 16:44:46 UTC
[spark] branch branch-2.4 updated: [SPARK-26734][STREAMING] Fix
StackOverflowError with large block queue
This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 9b2eedc [SPARK-26734][STREAMING] Fix StackOverflowError with large block queue
9b2eedc is described below
commit 9b2eedc5beb32e99eedbdca344278243531b1c52
Author: Ross Lodge <rl...@concentricsky.com>
AuthorDate: Wed Feb 6 08:43:40 2019 -0800
[SPARK-26734][STREAMING] Fix StackOverflowError with large block queue
## What changes were proposed in this pull request?
SPARK-23991 introduced a bug in `ReceivedBlockTracker#allocateBlocksToBatch`: when a queue with more than a few thousand blocks are in the queue, serializing the queue throws a StackOverflowError. This change just adds `dequeueAll` to the new `clone` operation on the queue so that the fix in 23991 is preserved but the serialized data comes from an ArrayBuffer which doesn't have the serialization problems that mutable.Queue has.
## How was this patch tested?
A unit test was added.
Closes #23716 from rlodge/SPARK-26734.
Authored-by: Ross Lodge <rl...@concentricsky.com>
Signed-off-by: Sean Owen <se...@databricks.com>
(cherry picked from commit 8427e9ba5cae28233d1bdc54208b46889b83a821)
Signed-off-by: Sean Owen <se...@databricks.com>
---
.../streaming/scheduler/ReceivedBlockTracker.scala | 6 +++++-
.../streaming/ReceivedBlockTrackerSuite.scala | 25 ++++++++++++++++++++--
2 files changed, 28 insertions(+), 3 deletions(-)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index cf43245..a9763cf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -111,8 +111,12 @@ private[streaming] class ReceivedBlockTracker(
*/
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
+ // We explicitly create an ArrayBuffer here because at least as of Scala 2.11 and 2.12
+ // a mutable.Queue fails serialization with a StackOverflow error if it has more than
+ // a few thousand elements. So we explicitly allocate a collection for serialization which
+ // we know doesn't have this issue. (See SPARK-26734).
val streamIdToBlocks = streamIds.map { streamId =>
- (streamId, getReceivedBlockQueue(streamId).clone())
+ (streamId, mutable.ArrayBuffer(getReceivedBlockQueue(streamId).clone(): _*))
}.toMap
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index fd7e00b..c0f5eff 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -96,6 +96,27 @@ class ReceivedBlockTrackerSuite
receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
}
+ test("block addition, and block to batch allocation with many blocks") {
+ val receivedBlockTracker = createTracker()
+ receivedBlockTracker.isWriteAheadLogEnabled should be (true)
+
+ val blockInfos = generateBlockInfos(100000)
+ blockInfos.map(receivedBlockTracker.addBlock)
+ receivedBlockTracker.allocateBlocksToBatch(1)
+
+ receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
+ receivedBlockTracker.hasUnallocatedReceivedBlocks should be (false)
+ receivedBlockTracker.getBlocksOfBatch(1) shouldEqual Map(streamId -> blockInfos)
+ receivedBlockTracker.getBlocksOfBatchAndStream(1, streamId) shouldEqual blockInfos
+
+ val expectedWrittenData1 = blockInfos.map(BlockAdditionEvent) :+
+ BatchAllocationEvent(1, AllocatedBlocks(Map(streamId -> blockInfos)))
+ getWrittenLogData() shouldEqual expectedWrittenData1
+ getWriteAheadLogFiles() should have size 1
+
+ receivedBlockTracker.stop()
+ }
+
test("recovery with write ahead logs should remove only allocated blocks from received queue") {
val manualClock = new ManualClock
val batchTime = manualClock.getTimeMillis()
@@ -362,8 +383,8 @@ class ReceivedBlockTrackerSuite
}
/** Generate blocks infos using random ids */
- def generateBlockInfos(): Seq[ReceivedBlockInfo] = {
- List.fill(5)(ReceivedBlockInfo(streamId, Some(0L), None,
+ def generateBlockInfos(blockCount: Int = 5): Seq[ReceivedBlockInfo] = {
+ List.fill(blockCount)(ReceivedBlockInfo(streamId, Some(0L), None,
BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)), Some(0L))))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org