You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2016/07/11 11:52:11 UTC

[jira] [Commented] (SPARK-16480) Streaming checkpointing does not work well with SIGTERM

    [ https://issues.apache.org/jira/browse/SPARK-16480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15370602#comment-15370602 ] 

Sean Owen commented on SPARK-16480:
-----------------------------------

[~skonto] can you edit this down a bit / format it better? I'm having trouble make sense of it. Yes you can't invoke RDD actions in a remotely-executed function but that's self-evident and not related to checkpoints. It's perhaps a knock-on error. The timestamp message is correct, but also perhaps irrelevant; what is zeroTime in this case given how you ran it? SIGTERM is a hard way to kill a process; why not SIGINT? I think this cause a corrupted checkpoint file, which you then couldn't read back.

> Streaming checkpointing does not work well with SIGTERM
> -------------------------------------------------------
>
>                 Key: SPARK-16480
>                 URL: https://issues.apache.org/jira/browse/SPARK-16480
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.6.1
>            Reporter: Stavros Kontopoulos
>
> A customer gets the following exception when tries to stop gracefully a streaming job with SIGTERM:
> org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
> 	at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
> 	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> This implies invalid state for checkpointing data. This can be reproduced easily with the following code skeleton (Kafka direct stream is used):
> val dataStream = myKafkaDirectStream.
>   .mapWithState(stateSpec).stateSnapshots().foreachRDD { data =>
>   // back up the state
>   data.cache()
>   data.collect().foreach(println)
>   data.map { x => x._1 + "," + x._2 }.saveAsTextFile(inputPath)
> ..
> }
> inputPath is both the initialRdd and the checkpoiting dir (using a different path does not affect the issue).
> Streaming context is created correctly with getOrCreate and all transformations and actions are put in there.
> In order to reproduce you just run the job, stop it with kill -15 ... and then restart it.
> To reproduce you can use an empty local fs folder for the "inputPath" the checkpointing folder.
> The checkpointing blocks which are updated with the first run:
> 16/07/11 13:59:51 DEBUG DirectKafkaInputDStream: Updated checkpoint data for time 1468234791000 ms: [
> 4 checkpoint files 
> 1468234791000 ms -> [Lscala.Tuple4;@60a55c45
> 1468234790000 ms -> [Lscala.Tuple4;@58e5be3
> 1468234789000 ms -> [Lscala.Tuple4;@13cf6be7
> 1468234788000 ms -> [Lscala.Tuple4;@6017d6ae
> ]
> 16/07/11 13:59:51 DEBUG InternalMapWithStateDStream: Updated checkpoint data for time 1468234791000 ms: [
> 0 checkpoint files 
> ]
> 16/07/11 13:59:51 DEBUG FlatMappedDStream: Updated checkpoint data for time 1468234791000 ms: [
> 0 checkpoint files 
> ]
> 16/07/11 13:59:51 DEBUG ForEachDStream: Updated checkpoint data for time 1468234791000 ms: [
> 0 checkpoint files 
> ]
> The invalid timestamp issue which causes the exception:
> 16/07/11 14:00:02 DEBUG FileBasedWriteAheadLogReader: Error reading next item, EOF reached
> java.io.EOFException
> 	at java.io.DataInputStream.readInt(DataInputStream.java:392)
> 	at org.apache.spark.streaming.util.FileBasedWriteAheadLogReader.hasNext(FileBasedWriteAheadLogReader.scala:47)
> 	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> 	at scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:432)
> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> 	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$recoverPastEvents$1.apply(ReceivedBlockTracker.scala:212)
> 	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$recoverPastEvents$1.apply(ReceivedBlockTracker.scala:210)
> 	at scala.Option.foreach(Option.scala:236)
> 	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.recoverPastEvents(ReceivedBlockTracker.scala:210)
> 	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.<init>(ReceivedBlockTracker.scala:81)
> 	at org.apache.spark.streaming.scheduler.ReceiverTracker.<init>(ReceiverTracker.scala:106)
> 	at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:80)
> 	at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:610)
> 	at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606)
> 	at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606)
> 	at org.apache.spark.util.ThreadUtils$$anon$1.run(ThreadUtils.scala:122)
> 16/07/11 14:00:02 INFO JobGenerator: Batches during down time (12 batches): 1468234791000 ms, 1468234792000 ms, 1468234793000 ms, 1468234794000 ms, 1468234795000 ms, 1468234796000 ms, 1468234797000 ms, 1468234798000 ms, 1468234799000 ms, 1468234800000 ms, 1468234801000 ms, 1468234802000 ms
> 16/07/11 14:00:02 INFO JobGenerator: Batches pending processing (0 batches): 
> 16/07/11 14:00:02 INFO JobGenerator: Batches to reschedule (12 batches): 1468234791000 ms, 1468234792000 ms, 1468234793000 ms, 1468234794000 ms, 1468234795000 ms, 1468234796000 ms, 1468234797000 ms, 1468234798000 ms, 1468234799000 ms, 1468234800000 ms, 1468234801000 ms, 1468234802000 ms
> 16/07/11 14:00:02 DEBUG DStreamGraph: Generating jobs for time 1468234791000 ms
> 16/07/11 14:00:02 DEBUG FlatMappedDStream: Time 1468234791000 ms is valid
> 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234791000 ms is valid
> 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234790000 ms is valid
> 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234789000 ms is valid
> 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234788000 ms is valid
> 16/07/11 14:00:02 INFO InternalMapWithStateDStream: Time 1468234787000 ms is invalid as zeroTime is 1468234787000 ms and slideDuration is 1000 ms and difference is 0 ms
> 16/07/11 14:00:02 ERROR StreamingContext: Error starting the context, marking it as stopped
> org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
> 	at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
> 	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> 	at org.apache.spark.rdd.PairRDDFunctions.partitionBy(PairRDDFunctions.scala:530)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD$.createFromPairRDD(MapWithStateRDD.scala:189)
> Maybe the first exception relates to:
> https://mail-archives.apache.org/mod_mbox/spark-user/201509.mbox/%3CCAMz94CGJzCv6yHW1mOTU2QRX=Pdu2k_PCvJ0++JPNM=uPFgfXg@mail.gmail.com%3E
> and causes the issue (not sure what is happening there).
> In the first run i successfully see:
> 16/07/11 13:59:49 INFO StreamingContext: Invoking
> stop(stopGracefully=true) from shutdown hook
> The issue was reported to happen also when using HDFS.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org