You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2016/07/08 10:59:10 UTC

[jira] [Comment Edited] (SPARK-16417) spark 1.5.2 receiver store(single-record) with ahead log enabled makes executor crash if there is an exception when BlockGenerator storing block

    [ https://issues.apache.org/jira/browse/SPARK-16417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15367486#comment-15367486 ] 

Sean Owen edited comment on SPARK-16417 at 7/8/16 10:58 AM:
------------------------------------------------------------

I referred to the 1.6.2 source code and found the same problem. I just picked up a code snippet from this scala class org.apache.spark.streaming.receiver.BlockGenerator. 

{code}
  /** Keep pushing blocks to the BlockManager. */
  private def keepPushingBlocks() {
    logInfo("Started block pushing thread")

    def areBlocksBeingGenerated: Boolean = synchronized {
      state != StoppedGeneratingBlocks
    }

    try {
      // While blocks are being generated, keep polling for to-be-pushed blocks and push them.
      while (areBlocksBeingGenerated) {
        Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {
          case Some(block) => pushBlock(block)
          case None =>
        }
      }

      // At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks.
      logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")
      while (!blocksForPushing.isEmpty) {
        val block = blocksForPushing.take()
        logDebug(s"Pushing block $block")
        pushBlock(block)
        logInfo("Blocks left to push " + blocksForPushing.size())
      }
      logInfo("Stopped block pushing thread")
    } catch {
      case ie: InterruptedException =>
        logInfo("Block pushing thread was interrupted")
      case e: Exception =>
        reportError("Error in block pushing thread", e)
    }
  }
{code}

This funciton 'keepPushingBlocks' shows that if there is an exception thrown when calling  pushBlock(block), the thread just report the error message and exit without crash this executor. This means the receiver can still put the single record in the memory until this executor OOM. And only at the OOM time, can i find the online issue (from the spark UI). But it is really too late.
This case only happens when using the receiver.store(single-record). In my use case, the thrown exception is listed below:

WARN [org.apache.spark.streaming.scheduler.ReceiverTracker:logWarning:71] - Error reported by receiver for stream 3: Futures timed out after [30 seconds] - java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:202)
    at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:156)
    at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:127)
    at org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:124)
    at com.jd.aof.spark.streaming.receiver.BlockProducer.pushBlockWithRetry(BlockProducer.scala:157)
    at com.jd.aof.spark.streaming.receiver.BlockProducer.keepPushingBlocks(BlockProducer.scala:134)
    at com.jd.aof.spark.streaming.receiver.BlockProducer$$anon$2.run(BlockProducer.scala:42)

I don't know how to fix this timeout exception (this happens casually, maybe once per two days), i really appreciate it if you know how to fix it. But turn to spark, as my understanding, this thread should crash this executor instead of eating exceptions.

In order to avoid this issue, i generate the block myself and use store(ArrayBuffer) interface and handle the uncertain exception myself.


was (Author: ren xing):
I referred to the 1.6.2 source code and found the same problem. I just picked up a code snippet from this scala class org.apache.spark.streaming.receiver.BlockGenerator. 
  /** Keep pushing blocks to the BlockManager. */
  private def keepPushingBlocks() {
    logInfo("Started block pushing thread")

    def areBlocksBeingGenerated: Boolean = synchronized {
      state != StoppedGeneratingBlocks
    }

    try {
      // While blocks are being generated, keep polling for to-be-pushed blocks and push them.
      while (areBlocksBeingGenerated) {
        Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {
          case Some(block) => pushBlock(block)
          case None =>
        }
      }

      // At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks.
      logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")
      while (!blocksForPushing.isEmpty) {
        val block = blocksForPushing.take()
        logDebug(s"Pushing block $block")
        pushBlock(block)
        logInfo("Blocks left to push " + blocksForPushing.size())
      }
      logInfo("Stopped block pushing thread")
    } catch {
      case ie: InterruptedException =>
        logInfo("Block pushing thread was interrupted")
      case e: Exception =>
        reportError("Error in block pushing thread", e)
    }
  }


This funciton 'keepPushingBlocks' shows that if there is an exception thrown when calling  pushBlock(block), the thread just report the error message and exit without crash this executor. This means the receiver can still put the single record in the memory until this executor OOM. And only at the OOM time, can i find the online issue (from the spark UI). But it is really too late.
This case only happens when using the receiver.store(single-record). In my use case, the thrown exception is listed below:

WARN [org.apache.spark.streaming.scheduler.ReceiverTracker:logWarning:71] - Error reported by receiver for stream 3: Futures timed out after [30 seconds] - java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:202)
    at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:156)
    at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:127)
    at org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:124)
    at com.jd.aof.spark.streaming.receiver.BlockProducer.pushBlockWithRetry(BlockProducer.scala:157)
    at com.jd.aof.spark.streaming.receiver.BlockProducer.keepPushingBlocks(BlockProducer.scala:134)
    at com.jd.aof.spark.streaming.receiver.BlockProducer$$anon$2.run(BlockProducer.scala:42)

I don't know how to fix this timeout exception (this happens casually, maybe once per two days), i really appreciate it if you know how to fix it. But turn to spark, as my understanding, this thread should crash this executor instead of eating exceptions.

In order to avoid this issue, i generate the block myself and use store(ArrayBuffer) interface and handle the uncertain exception myself.

> spark 1.5.2 receiver store(single-record) with ahead log enabled makes executor crash if there is an exception when BlockGenerator storing block
> ------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-16417
>                 URL: https://issues.apache.org/jira/browse/SPARK-16417
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.5.2
>         Environment: spark streaming version 1.5.2.
>            Reporter: ren xing
>
> receiver has the store(single-record) function which actually puts the record to a buffer. One backend thread will periodically search this buffer and generate a block and store this block to spark. If enabled the ahead log, sometimes there be an exception when writing the ahead log. This exception will be caught by the backend thread. However the backend thread just print some message AND EXIT! This means there will be no consumer for the receiver inner buffered records. As time goes on, the executor will be OOM



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org