You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/02/06 16:44:46 UTC

[spark] branch branch-2.4 updated: [SPARK-26734][STREAMING] Fix StackOverflowError with large block queue

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 9b2eedc  [SPARK-26734][STREAMING] Fix StackOverflowError with large block queue
9b2eedc is described below

commit 9b2eedc5beb32e99eedbdca344278243531b1c52
Author: Ross Lodge <rl...@concentricsky.com>
AuthorDate: Wed Feb 6 08:43:40 2019 -0800

    [SPARK-26734][STREAMING] Fix StackOverflowError with large block queue
    
    ## What changes were proposed in this pull request?
    
    SPARK-23991 introduced a bug in `ReceivedBlockTracker#allocateBlocksToBatch`: when a queue with more than a few thousand blocks are in the queue, serializing the queue throws a StackOverflowError.  This change just adds `dequeueAll` to the new `clone` operation on the queue so that the fix in 23991 is preserved but the serialized data comes from an ArrayBuffer which doesn't have the serialization problems that mutable.Queue has.
    
    ## How was this patch tested?
    
    A unit test was added.
    
    Closes #23716 from rlodge/SPARK-26734.
    
    Authored-by: Ross Lodge <rl...@concentricsky.com>
    Signed-off-by: Sean Owen <se...@databricks.com>
    (cherry picked from commit 8427e9ba5cae28233d1bdc54208b46889b83a821)
    Signed-off-by: Sean Owen <se...@databricks.com>
---
 .../streaming/scheduler/ReceivedBlockTracker.scala |  6 +++++-
 .../streaming/ReceivedBlockTrackerSuite.scala      | 25 ++++++++++++++++++++--
 2 files changed, 28 insertions(+), 3 deletions(-)

diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index cf43245..a9763cf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -111,8 +111,12 @@ private[streaming] class ReceivedBlockTracker(
    */
   def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
     if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
+      // We explicitly create an ArrayBuffer here because at least as of Scala 2.11 and 2.12
+      // a mutable.Queue fails serialization with a StackOverflow error if it has more than
+      // a few thousand elements.  So we explicitly allocate a collection for serialization which
+      // we know doesn't have this issue.  (See SPARK-26734).
       val streamIdToBlocks = streamIds.map { streamId =>
-        (streamId, getReceivedBlockQueue(streamId).clone())
+        (streamId, mutable.ArrayBuffer(getReceivedBlockQueue(streamId).clone(): _*))
       }.toMap
       val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
       if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index fd7e00b..c0f5eff 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -96,6 +96,27 @@ class ReceivedBlockTrackerSuite
     receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
   }
 
+  test("block addition, and block to batch allocation with many blocks") {
+    val receivedBlockTracker = createTracker()
+    receivedBlockTracker.isWriteAheadLogEnabled should be (true)
+
+    val blockInfos = generateBlockInfos(100000)
+    blockInfos.map(receivedBlockTracker.addBlock)
+    receivedBlockTracker.allocateBlocksToBatch(1)
+
+    receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
+    receivedBlockTracker.hasUnallocatedReceivedBlocks should be (false)
+    receivedBlockTracker.getBlocksOfBatch(1) shouldEqual Map(streamId -> blockInfos)
+    receivedBlockTracker.getBlocksOfBatchAndStream(1, streamId) shouldEqual blockInfos
+
+    val expectedWrittenData1 = blockInfos.map(BlockAdditionEvent) :+
+      BatchAllocationEvent(1, AllocatedBlocks(Map(streamId -> blockInfos)))
+    getWrittenLogData() shouldEqual expectedWrittenData1
+    getWriteAheadLogFiles() should have size 1
+
+    receivedBlockTracker.stop()
+  }
+
   test("recovery with write ahead logs should remove only allocated blocks from received queue") {
     val manualClock = new ManualClock
     val batchTime = manualClock.getTimeMillis()
@@ -362,8 +383,8 @@ class ReceivedBlockTrackerSuite
   }
 
   /** Generate blocks infos using random ids */
-  def generateBlockInfos(): Seq[ReceivedBlockInfo] = {
-    List.fill(5)(ReceivedBlockInfo(streamId, Some(0L), None,
+  def generateBlockInfos(blockCount: Int = 5): Seq[ReceivedBlockInfo] = {
+    List.fill(blockCount)(ReceivedBlockInfo(streamId, Some(0L), None,
       BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)), Some(0L))))
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org