You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:37:33 UTC

[jira] [Resolved] (SPARK-16480) Streaming checkpointing does not work well with SIGTERM

     [ https://issues.apache.org/jira/browse/SPARK-16480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-16480.
----------------------------------
    Resolution: Incomplete

> Streaming checkpointing does not work well with SIGTERM
> -------------------------------------------------------
>
>                 Key: SPARK-16480
>                 URL: https://issues.apache.org/jira/browse/SPARK-16480
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 1.6.1
>            Reporter: Stavros Kontopoulos
>            Priority: Major
>              Labels: bulk-closed
>
> A customer gets the following exception when tries to stop gracefully a streaming job with SIGTERM:
> {quote}
> org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
> 	at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
> 	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> {quote}
> This obviously implies invalid state for checkpointing data. This can be reproduced easily with the following code skeleton (Kafka direct stream is used):
> {quote}
> val dataStream = myKafkaDirectStream.
>   .mapWithState(stateSpec).stateSnapshots().foreachRDD { data =>
>   // back up the state
>   data.cache()
>   data.collect().foreach(println)
>   data.map { x => x._1 + "," + x._2 }.saveAsTextFile(inputPath)
> ..
> }
> {quote}
> inputPath above is both the initialRdd and the checkpointing dir (using a different path does not affect the issue).
> Streaming context is created correctly with getOrCreate and all transformations and actions are put in 
> that function.
> In order to reproduce you just run the job (1st run), stop it with kill -15 ... and then restart it (2nd run).
> To reproduce the issue you can use an empty local folder for the "inputPath" the checkpointing path.
> The checkpointing blocks from the first run:
> {quote}
> 16/07/11 13:59:51 DEBUG DirectKafkaInputDStream: Updated checkpoint data for time 1468234791000 ms: [
> 4 checkpoint files 
> 1468234791000 ms -> [Lscala.Tuple4;@60a55c45
> 1468234790000 ms -> [Lscala.Tuple4;@58e5be3
> 1468234789000 ms -> [Lscala.Tuple4;@13cf6be7
> 1468234788000 ms -> [Lscala.Tuple4;@6017d6ae
> ]
> 16/07/11 13:59:51 DEBUG InternalMapWithStateDStream: Updated checkpoint data for time 1468234791000 ms: [
> 0 checkpoint files 
> ]
> 16/07/11 13:59:51 DEBUG FlatMappedDStream: Updated checkpoint data for time 1468234791000 ms: [
> 0 checkpoint files 
> ]
> 16/07/11 13:59:51 DEBUG ForEachDStream: Updated checkpoint data for time 1468234791000 ms: [
> 0 checkpoint files 
> ]
> {quote}
> In the first run i successfully see: 
> {quote}
> 16/07/11 13:59:49 INFO StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook
> {quote}
> Here is the log output from the second run, when the recovery occurs from the checkpointing data:
> {quote}
> 16/07/11 14:00:02 DEBUG FileBasedWriteAheadLogReader: Error reading next item, EOF reached
> java.io.EOFException
> 	at java.io.DataInputStream.readInt(DataInputStream.java:392)
> 	at org.apache.spark.streaming.util.FileBasedWriteAheadLogReader.hasNext(FileBasedWriteAheadLogReader.scala:47)
> 	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> 	at scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:432)
> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> 	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$recoverPastEvents$1.apply(ReceivedBlockTracker.scala:212)
> 	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$recoverPastEvents$1.apply(ReceivedBlockTracker.scala:210)
> 	at scala.Option.foreach(Option.scala:236)
> 	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.recoverPastEvents(ReceivedBlockTracker.scala:210)
> 	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.<init>(ReceivedBlockTracker.scala:81)
> 	at org.apache.spark.streaming.scheduler.ReceiverTracker.<init>(ReceiverTracker.scala:106)
> 	at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:80)
> 	at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:610)
> 	at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606)
> 	at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606)
> 	at org.apache.spark.util.ThreadUtils$$anon$1.run(ThreadUtils.scala:122)
> 16/07/11 14:00:02 INFO JobGenerator: Batches during down time (12 batches): 1468234791000 ms, 1468234792000 ms, 1468234793000 ms, 1468234794000 ms, 1468234795000 ms, 1468234796000 ms, 1468234797000 ms, 1468234798000 ms, 1468234799000 ms, 1468234800000 ms, 1468234801000 ms, 1468234802000 ms
> 16/07/11 14:00:02 INFO JobGenerator: Batches pending processing (0 batches): 
> 16/07/11 14:00:02 INFO JobGenerator: Batches to reschedule (12 batches): 1468234791000 ms, 1468234792000 ms, 1468234793000 ms, 1468234794000 ms, 1468234795000 ms, 1468234796000 ms, 1468234797000 ms, 1468234798000 ms, 1468234799000 ms, 1468234800000 ms, 1468234801000 ms, 1468234802000 ms
> 16/07/11 14:00:02 DEBUG DStreamGraph: Generating jobs for time 1468234791000 ms
> 16/07/11 14:00:02 DEBUG FlatMappedDStream: Time 1468234791000 ms is valid
> 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234791000 ms is valid
> 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234790000 ms is valid
> 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234789000 ms is valid
> 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234788000 ms is valid
> 16/07/11 14:00:02 INFO InternalMapWithStateDStream: Time 1468234787000 ms is invalid as zeroTime is 1468234787000 ms and slideDuration is 1000 ms and difference is 0 ms
> 16/07/11 14:00:02 ERROR StreamingContext: Error starting the context, marking it as stopped
> org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
> 	at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
> 	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> 	at org.apache.spark.rdd.PairRDDFunctions.partitionBy(PairRDDFunctions.scala:530)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD$.createFromPairRDD(MapWithStateRDD.scala:189)
> {quote}
> Maybe the first exception in the 2nd run relates to:
> https://mail-archives.apache.org/mod_mbox/spark-user/201509.mbox/%3CCAMz94CGJzCv6yHW1mOTU2QRX=Pdu2k_PCvJ0++JPNM=uPFgfXg@mail.gmail.com%3E
> and causes the issue (not sure what is happening there).
> The issue was reported to happen also when using HDFS.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org