You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Stavros Kontopoulos (JIRA)" <ji...@apache.org> on 2016/07/11 12:53:11 UTC

[jira] [Comment Edited] (SPARK-16480) Streaming checkpointing does not work well with SIGTERM

    [ https://issues.apache.org/jira/browse/SPARK-16480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15370644#comment-15370644 ] 

Stavros Kontopoulos edited comment on SPARK-16480 at 7/11/16 12:52 PM:
-----------------------------------------------------------------------

[~srowen] Ok format fixed
SIGINT causes the same issue and code is pretty valid does not do any nested RDD calls.
SIGTERM should run the shutdown hooks.
What is the proper documented method to stop a streaming job?


was (Author: skonto):
[~srowen] Ok i will fix the format.
SIGINT causes the same issue and code is pretty valid does not do any nested RDD calls.
SIGTERM should run the shutdown hooks.
What is the proper documented method to stop a streaming job?

> Streaming checkpointing does not work well with SIGTERM
> -------------------------------------------------------
>
>                 Key: SPARK-16480
>                 URL: https://issues.apache.org/jira/browse/SPARK-16480
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.6.1
>            Reporter: Stavros Kontopoulos
>
> A customer gets the following exception when tries to stop gracefully a streaming job with SIGTERM:
> {quote}
> org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
> 	at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
> 	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> {quote}
> This obviously implies invalid state for checkpointing data. This can be reproduced easily with the following code skeleton (Kafka direct stream is used):
> {quote}
> val dataStream = myKafkaDirectStream.
>   .mapWithState(stateSpec).stateSnapshots().foreachRDD { data =>
>   // back up the state
>   data.cache()
>   data.collect().foreach(println)
>   data.map { x => x._1 + "," + x._2 }.saveAsTextFile(inputPath)
> ..
> }
> {quote}
> inputPath above is both the initialRdd and the checkpointing dir (using a different path does not affect the issue).
> Streaming context is created correctly with getOrCreate and all transformations and actions are put in 
> that function.
> In order to reproduce you just run the job (1st run), stop it with kill -15 ... and then restart it (2nd run).
> To reproduce the issue you can use an empty local folder for the "inputPath" the checkpointing path.
> The checkpointing blocks from the first run:
> {quote}
> 16/07/11 13:59:51 DEBUG DirectKafkaInputDStream: Updated checkpoint data for time 1468234791000 ms: [
> 4 checkpoint files 
> 1468234791000 ms -> [Lscala.Tuple4;@60a55c45
> 1468234790000 ms -> [Lscala.Tuple4;@58e5be3
> 1468234789000 ms -> [Lscala.Tuple4;@13cf6be7
> 1468234788000 ms -> [Lscala.Tuple4;@6017d6ae
> ]
> 16/07/11 13:59:51 DEBUG InternalMapWithStateDStream: Updated checkpoint data for time 1468234791000 ms: [
> 0 checkpoint files 
> ]
> 16/07/11 13:59:51 DEBUG FlatMappedDStream: Updated checkpoint data for time 1468234791000 ms: [
> 0 checkpoint files 
> ]
> 16/07/11 13:59:51 DEBUG ForEachDStream: Updated checkpoint data for time 1468234791000 ms: [
> 0 checkpoint files 
> ]
> {quote}
> In the first run i successfully see: 
> {quote}
> 16/07/11 13:59:49 INFO StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook
> {quote}
> Here is the log output from the second run, when the recovery occurs from the checkpointing data:
> {quote}
> 16/07/11 14:00:02 DEBUG FileBasedWriteAheadLogReader: Error reading next item, EOF reached
> java.io.EOFException
> 	at java.io.DataInputStream.readInt(DataInputStream.java:392)
> 	at org.apache.spark.streaming.util.FileBasedWriteAheadLogReader.hasNext(FileBasedWriteAheadLogReader.scala:47)
> 	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> 	at scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:432)
> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> 	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$recoverPastEvents$1.apply(ReceivedBlockTracker.scala:212)
> 	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$recoverPastEvents$1.apply(ReceivedBlockTracker.scala:210)
> 	at scala.Option.foreach(Option.scala:236)
> 	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.recoverPastEvents(ReceivedBlockTracker.scala:210)
> 	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.<init>(ReceivedBlockTracker.scala:81)
> 	at org.apache.spark.streaming.scheduler.ReceiverTracker.<init>(ReceiverTracker.scala:106)
> 	at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:80)
> 	at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:610)
> 	at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606)
> 	at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606)
> 	at org.apache.spark.util.ThreadUtils$$anon$1.run(ThreadUtils.scala:122)
> 16/07/11 14:00:02 INFO JobGenerator: Batches during down time (12 batches): 1468234791000 ms, 1468234792000 ms, 1468234793000 ms, 1468234794000 ms, 1468234795000 ms, 1468234796000 ms, 1468234797000 ms, 1468234798000 ms, 1468234799000 ms, 1468234800000 ms, 1468234801000 ms, 1468234802000 ms
> 16/07/11 14:00:02 INFO JobGenerator: Batches pending processing (0 batches): 
> 16/07/11 14:00:02 INFO JobGenerator: Batches to reschedule (12 batches): 1468234791000 ms, 1468234792000 ms, 1468234793000 ms, 1468234794000 ms, 1468234795000 ms, 1468234796000 ms, 1468234797000 ms, 1468234798000 ms, 1468234799000 ms, 1468234800000 ms, 1468234801000 ms, 1468234802000 ms
> 16/07/11 14:00:02 DEBUG DStreamGraph: Generating jobs for time 1468234791000 ms
> 16/07/11 14:00:02 DEBUG FlatMappedDStream: Time 1468234791000 ms is valid
> 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234791000 ms is valid
> 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234790000 ms is valid
> 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234789000 ms is valid
> 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234788000 ms is valid
> 16/07/11 14:00:02 INFO InternalMapWithStateDStream: Time 1468234787000 ms is invalid as zeroTime is 1468234787000 ms and slideDuration is 1000 ms and difference is 0 ms
> 16/07/11 14:00:02 ERROR StreamingContext: Error starting the context, marking it as stopped
> org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
> 	at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
> 	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> 	at org.apache.spark.rdd.PairRDDFunctions.partitionBy(PairRDDFunctions.scala:530)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD$.createFromPairRDD(MapWithStateRDD.scala:189)
> {quote}
> Maybe the first exception in the 2nd run relates to:
> https://mail-archives.apache.org/mod_mbox/spark-user/201509.mbox/%3CCAMz94CGJzCv6yHW1mOTU2QRX=Pdu2k_PCvJ0++JPNM=uPFgfXg@mail.gmail.com%3E
> and causes the issue (not sure what is happening there).
> The issue was reported to happen also when using HDFS.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org