You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Saisai Shao (JIRA)" <ji...@apache.org> on 2015/01/14 02:39:34 UTC

[jira] [Commented] (SPARK-5220) keepPushingBlocks in BlockGenerator terminated when an exception occurs, which causes the block pushing thread to terminate and blocks receiver

    [ https://issues.apache.org/jira/browse/SPARK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14276350#comment-14276350 ] 

Saisai Shao commented on SPARK-5220:
------------------------------------

Hi Max, as I said in the mail, this is an expected behavior of receiver and block generator because of locking mechanism of BlockGenerator. The receiver will block on the locks for adding data into BlockGenerator, and the BlockGenerator is waiting for pushing thread to put data into HDFS and BM. Because of unmatched speed, it is expected from my understanding.

> keepPushingBlocks in BlockGenerator terminated when an exception occurs, which causes the block pushing thread to terminate and blocks receiver  
> -------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-5220
>                 URL: https://issues.apache.org/jira/browse/SPARK-5220
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.2.0
>            Reporter: Max Xu
>
> I am running a Spark streaming application with ReliableKafkaReceiver. It uses BlockGenerator to push blocks to BlockManager. However, writing WALs to HDFS may time out that causes keepPushingBlocks in BlockGenerator to terminate.
> 15/01/12 19:07:06 ERROR receiver.BlockGenerator: Error in block pushing thread
> java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
>         at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>         at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>         at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>         at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>         at scala.concurrent.Await$.result(package.scala:107)
>         at org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:176)
>         at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:160)
>         at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:126)
>         at org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:124)
>         at org.apache.spark.streaming.kafka.ReliableKafkaReceiver.org$apache$spark$streaming$kafka$ReliableKafkaReceiver$$storeBlockAndCommitOffset(ReliableKafkaReceiver.scala:207)
>         at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$GeneratedBlockHandler.onPushBlock(ReliableKafkaReceiver.scala:275)
>         at org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:181)
>         at org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:154)
>         at org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:86)
> Then the block pushing thread is done and no subsequent blocks can be pushed into blockManager. In turn this blocks receiver from receiving new data.
> So when running my app and the TimeoutException happens, the ReliableKafkaReceiver stays in ACTIVE status but doesn't do anything at all. The application rogues.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org