You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Stavros Kontopoulos (JIRA)" <ji...@apache.org> on 2016/07/11 11:43:11 UTC

[jira] [Updated] (SPARK-16480) Streaming checkpointing does not work well with SIGTERM

     [ https://issues.apache.org/jira/browse/SPARK-16480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stavros Kontopoulos updated SPARK-16480:
----------------------------------------
    Description: 
A customer gets the following exception when tries to stop gracefully a streaming job with SIGTERM:

org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
	at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)

This implies invalid state for checkpointing data. This can be reproduced
easily with the following code skeleton (Kafka direct stream is used):
val dataStream = myKafkaDirectStream.
  .mapWithState(stateSpec).stateSnapshots().foreachRDD { data =>

  // back up the state
  data.cache()
  data.collect().foreach(println)
  data.map { x => x._1 + "," + x._2 }.saveAsTextFile(inputPath)
..
}

inputPath is both the initialRdd and the checkpoiting dir
(using a different path does not affect the issue).

Streaming context is created correctly with getOrCreate and all transformations and actions are put in there.
In order to reproduce you just run the job,
stop it with kill -15 ... and then restart it.
To reproduce you can use an empty local fs folder for the "inputPath" the checkpointing folder.

The checkpointing blocks which are updated with the first run:
16/07/11 13:59:51 DEBUG DirectKafkaInputDStream: Updated checkpoint data for time 1468234791000 ms: [
4 checkpoint files 
1468234791000 ms -> [Lscala.Tuple4;@60a55c45
1468234790000 ms -> [Lscala.Tuple4;@58e5be3
1468234789000 ms -> [Lscala.Tuple4;@13cf6be7
1468234788000 ms -> [Lscala.Tuple4;@6017d6ae
]
16/07/11 13:59:51 DEBUG InternalMapWithStateDStream: Updated checkpoint data for time 1468234791000 ms: [
0 checkpoint files 

]
16/07/11 13:59:51 DEBUG FlatMappedDStream: Updated checkpoint data for time 1468234791000 ms: [
0 checkpoint files 

]
16/07/11 13:59:51 DEBUG ForEachDStream: Updated checkpoint data for time 1468234791000 ms: [
0 checkpoint files 

]

The invalid timestamp issue which causes the exception:

16/07/11 14:00:02 DEBUG FileBasedWriteAheadLogReader: Error reading next item, EOF reached
java.io.EOFException
	at java.io.DataInputStream.readInt(DataInputStream.java:392)
	at org.apache.spark.streaming.util.FileBasedWriteAheadLogReader.hasNext(FileBasedWriteAheadLogReader.scala:47)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
	at scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:432)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$recoverPastEvents$1.apply(ReceivedBlockTracker.scala:212)
	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$recoverPastEvents$1.apply(ReceivedBlockTracker.scala:210)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.recoverPastEvents(ReceivedBlockTracker.scala:210)
	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.<init>(ReceivedBlockTracker.scala:81)
	at org.apache.spark.streaming.scheduler.ReceiverTracker.<init>(ReceiverTracker.scala:106)
	at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:80)
	at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:610)
	at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606)
	at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606)
	at org.apache.spark.util.ThreadUtils$$anon$1.run(ThreadUtils.scala:122)
16/07/11 14:00:02 INFO JobGenerator: Batches during down time (12 batches): 1468234791000 ms, 1468234792000 ms, 1468234793000 ms, 1468234794000 ms, 1468234795000 ms, 1468234796000 ms, 1468234797000 ms, 1468234798000 ms, 1468234799000 ms, 1468234800000 ms, 1468234801000 ms, 1468234802000 ms
16/07/11 14:00:02 INFO JobGenerator: Batches pending processing (0 batches): 
16/07/11 14:00:02 INFO JobGenerator: Batches to reschedule (12 batches): 1468234791000 ms, 1468234792000 ms, 1468234793000 ms, 1468234794000 ms, 1468234795000 ms, 1468234796000 ms, 1468234797000 ms, 1468234798000 ms, 1468234799000 ms, 1468234800000 ms, 1468234801000 ms, 1468234802000 ms
16/07/11 14:00:02 DEBUG DStreamGraph: Generating jobs for time 1468234791000 ms
16/07/11 14:00:02 DEBUG FlatMappedDStream: Time 1468234791000 ms is valid
16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234791000 ms is valid
16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234790000 ms is valid
16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234789000 ms is valid
16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234788000 ms is valid
16/07/11 14:00:02 INFO InternalMapWithStateDStream: Time 1468234787000 ms is invalid as zeroTime is 1468234787000 ms and slideDuration is 1000 ms and difference is 0 ms
16/07/11 14:00:02 ERROR StreamingContext: Error starting the context, marking it as stopped
org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
	at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
	at org.apache.spark.rdd.PairRDDFunctions.partitionBy(PairRDDFunctions.scala:530)
	at org.apache.spark.streaming.rdd.MapWithStateRDD$.createFromPairRDD(MapWithStateRDD.scala:189)

Maybe the first exception relates to:

https://mail-archives.apache.org/mod_mbox/spark-user/201509.mbox/%3CCAMz94CGJzCv6yHW1mOTU2QRX=Pdu2k_PCvJ0++JPNM=uPFgfXg@mail.gmail.com%3E

and causes the issue (not sure what is happening there).

In the first run i successfully see:
16/07/11 13:59:49 INFO StreamingContext: Invoking
stop(stopGracefully=true) from shutdown hook

The issue was reported to happen also when using HDFS.


  was:
A customer gets the following exception when tries to stop gracefully a streaming job with SIGTERM:
org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
	at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)

This implies invalid state for checkpointing data. This can be reproduced
easily with the following code skeleton (Kafka direct stream is used):
val dataStream = myKafkaDirectStream.
  .mapWithState(stateSpec).stateSnapshots().foreachRDD { data =>

  // back up the state
  data.cache()
  data.collect().foreach(println)
  data.map { x => x._1 + "," + x._2 }.saveAsTextFile(inputPath)
..
}

inputPath is both the initialRdd and the checkpoiting dir
(using a different path does not affect the issue).

Streaming context is created correctly with getOrCreate and all transformations and actions are put in there.
In order to reproduce you just run the job,
stop it with kill -15 ... and then restart it.
To reproduce you can use an empty local fs folder for the "inputPath" the checkpointing folder.

The checkpointing blocks which are updated with the first run:
16/07/11 13:59:51 DEBUG DirectKafkaInputDStream: Updated checkpoint data for time 1468234791000 ms: [
4 checkpoint files 
1468234791000 ms -> [Lscala.Tuple4;@60a55c45
1468234790000 ms -> [Lscala.Tuple4;@58e5be3
1468234789000 ms -> [Lscala.Tuple4;@13cf6be7
1468234788000 ms -> [Lscala.Tuple4;@6017d6ae
]
16/07/11 13:59:51 DEBUG InternalMapWithStateDStream: Updated checkpoint data for time 1468234791000 ms: [
0 checkpoint files 

]
16/07/11 13:59:51 DEBUG FlatMappedDStream: Updated checkpoint data for time 1468234791000 ms: [
0 checkpoint files 

]
16/07/11 13:59:51 DEBUG ForEachDStream: Updated checkpoint data for time 1468234791000 ms: [
0 checkpoint files 

]

The invalid timestamp issue which causes the exception:

16/07/11 14:00:02 DEBUG FileBasedWriteAheadLogReader: Error reading next item, EOF reached
java.io.EOFException
	at java.io.DataInputStream.readInt(DataInputStream.java:392)
	at org.apache.spark.streaming.util.FileBasedWriteAheadLogReader.hasNext(FileBasedWriteAheadLogReader.scala:47)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
	at scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:432)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$recoverPastEvents$1.apply(ReceivedBlockTracker.scala:212)
	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$recoverPastEvents$1.apply(ReceivedBlockTracker.scala:210)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.recoverPastEvents(ReceivedBlockTracker.scala:210)
	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.<init>(ReceivedBlockTracker.scala:81)
	at org.apache.spark.streaming.scheduler.ReceiverTracker.<init>(ReceiverTracker.scala:106)
	at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:80)
	at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:610)
	at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606)
	at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606)
	at org.apache.spark.util.ThreadUtils$$anon$1.run(ThreadUtils.scala:122)
16/07/11 14:00:02 INFO JobGenerator: Batches during down time (12 batches): 1468234791000 ms, 1468234792000 ms, 1468234793000 ms, 1468234794000 ms, 1468234795000 ms, 1468234796000 ms, 1468234797000 ms, 1468234798000 ms, 1468234799000 ms, 1468234800000 ms, 1468234801000 ms, 1468234802000 ms
16/07/11 14:00:02 INFO JobGenerator: Batches pending processing (0 batches): 
16/07/11 14:00:02 INFO JobGenerator: Batches to reschedule (12 batches): 1468234791000 ms, 1468234792000 ms, 1468234793000 ms, 1468234794000 ms, 1468234795000 ms, 1468234796000 ms, 1468234797000 ms, 1468234798000 ms, 1468234799000 ms, 1468234800000 ms, 1468234801000 ms, 1468234802000 ms
16/07/11 14:00:02 DEBUG DStreamGraph: Generating jobs for time 1468234791000 ms
16/07/11 14:00:02 DEBUG FlatMappedDStream: Time 1468234791000 ms is valid
16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234791000 ms is valid
16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234790000 ms is valid
16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234789000 ms is valid
16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234788000 ms is valid
16/07/11 14:00:02 INFO InternalMapWithStateDStream: Time 1468234787000 ms is invalid as zeroTime is 1468234787000 ms and slideDuration is 1000 ms and difference is 0 ms
16/07/11 14:00:02 ERROR StreamingContext: Error starting the context, marking it as stopped
org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
	at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
	at org.apache.spark.rdd.PairRDDFunctions.partitionBy(PairRDDFunctions.scala:530)
	at org.apache.spark.streaming.rdd.MapWithStateRDD$.createFromPairRDD(MapWithStateRDD.scala:189)

Maybe the first exception relates to:

https://mail-archives.apache.org/mod_mbox/spark-user/201509.mbox/%3CCAMz94CGJzCv6yHW1mOTU2QRX=Pdu2k_PCvJ0++JPNM=uPFgfXg@mail.gmail.com%3E

and causes the issue (not sure what is happening there).

In the first run i successfully see:
16/07/11 13:59:49 INFO StreamingContext: Invoking
stop(stopGracefully=true) from shutdown hook

The issue was reported to happen also when using HDFS.



> Streaming checkpointing does not work well with SIGTERM
> -------------------------------------------------------
>
>                 Key: SPARK-16480
>                 URL: https://issues.apache.org/jira/browse/SPARK-16480
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.6.1
>            Reporter: Stavros Kontopoulos
>
> A customer gets the following exception when tries to stop gracefully a streaming job with SIGTERM:
> org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
> 	at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
> 	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> This implies invalid state for checkpointing data. This can be reproduced
> easily with the following code skeleton (Kafka direct stream is used):
> val dataStream = myKafkaDirectStream.
>   .mapWithState(stateSpec).stateSnapshots().foreachRDD { data =>
>   // back up the state
>   data.cache()
>   data.collect().foreach(println)
>   data.map { x => x._1 + "," + x._2 }.saveAsTextFile(inputPath)
> ..
> }
> inputPath is both the initialRdd and the checkpoiting dir
> (using a different path does not affect the issue).
> Streaming context is created correctly with getOrCreate and all transformations and actions are put in there.
> In order to reproduce you just run the job,
> stop it with kill -15 ... and then restart it.
> To reproduce you can use an empty local fs folder for the "inputPath" the checkpointing folder.
> The checkpointing blocks which are updated with the first run:
> 16/07/11 13:59:51 DEBUG DirectKafkaInputDStream: Updated checkpoint data for time 1468234791000 ms: [
> 4 checkpoint files 
> 1468234791000 ms -> [Lscala.Tuple4;@60a55c45
> 1468234790000 ms -> [Lscala.Tuple4;@58e5be3
> 1468234789000 ms -> [Lscala.Tuple4;@13cf6be7
> 1468234788000 ms -> [Lscala.Tuple4;@6017d6ae
> ]
> 16/07/11 13:59:51 DEBUG InternalMapWithStateDStream: Updated checkpoint data for time 1468234791000 ms: [
> 0 checkpoint files 
> ]
> 16/07/11 13:59:51 DEBUG FlatMappedDStream: Updated checkpoint data for time 1468234791000 ms: [
> 0 checkpoint files 
> ]
> 16/07/11 13:59:51 DEBUG ForEachDStream: Updated checkpoint data for time 1468234791000 ms: [
> 0 checkpoint files 
> ]
> The invalid timestamp issue which causes the exception:
> 16/07/11 14:00:02 DEBUG FileBasedWriteAheadLogReader: Error reading next item, EOF reached
> java.io.EOFException
> 	at java.io.DataInputStream.readInt(DataInputStream.java:392)
> 	at org.apache.spark.streaming.util.FileBasedWriteAheadLogReader.hasNext(FileBasedWriteAheadLogReader.scala:47)
> 	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> 	at scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:432)
> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> 	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$recoverPastEvents$1.apply(ReceivedBlockTracker.scala:212)
> 	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$recoverPastEvents$1.apply(ReceivedBlockTracker.scala:210)
> 	at scala.Option.foreach(Option.scala:236)
> 	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.recoverPastEvents(ReceivedBlockTracker.scala:210)
> 	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.<init>(ReceivedBlockTracker.scala:81)
> 	at org.apache.spark.streaming.scheduler.ReceiverTracker.<init>(ReceiverTracker.scala:106)
> 	at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:80)
> 	at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:610)
> 	at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606)
> 	at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606)
> 	at org.apache.spark.util.ThreadUtils$$anon$1.run(ThreadUtils.scala:122)
> 16/07/11 14:00:02 INFO JobGenerator: Batches during down time (12 batches): 1468234791000 ms, 1468234792000 ms, 1468234793000 ms, 1468234794000 ms, 1468234795000 ms, 1468234796000 ms, 1468234797000 ms, 1468234798000 ms, 1468234799000 ms, 1468234800000 ms, 1468234801000 ms, 1468234802000 ms
> 16/07/11 14:00:02 INFO JobGenerator: Batches pending processing (0 batches): 
> 16/07/11 14:00:02 INFO JobGenerator: Batches to reschedule (12 batches): 1468234791000 ms, 1468234792000 ms, 1468234793000 ms, 1468234794000 ms, 1468234795000 ms, 1468234796000 ms, 1468234797000 ms, 1468234798000 ms, 1468234799000 ms, 1468234800000 ms, 1468234801000 ms, 1468234802000 ms
> 16/07/11 14:00:02 DEBUG DStreamGraph: Generating jobs for time 1468234791000 ms
> 16/07/11 14:00:02 DEBUG FlatMappedDStream: Time 1468234791000 ms is valid
> 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234791000 ms is valid
> 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234790000 ms is valid
> 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234789000 ms is valid
> 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234788000 ms is valid
> 16/07/11 14:00:02 INFO InternalMapWithStateDStream: Time 1468234787000 ms is invalid as zeroTime is 1468234787000 ms and slideDuration is 1000 ms and difference is 0 ms
> 16/07/11 14:00:02 ERROR StreamingContext: Error starting the context, marking it as stopped
> org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
> 	at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
> 	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> 	at org.apache.spark.rdd.PairRDDFunctions.partitionBy(PairRDDFunctions.scala:530)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD$.createFromPairRDD(MapWithStateRDD.scala:189)
> Maybe the first exception relates to:
> https://mail-archives.apache.org/mod_mbox/spark-user/201509.mbox/%3CCAMz94CGJzCv6yHW1mOTU2QRX=Pdu2k_PCvJ0++JPNM=uPFgfXg@mail.gmail.com%3E
> and causes the issue (not sure what is happening there).
> In the first run i successfully see:
> 16/07/11 13:59:49 INFO StreamingContext: Invoking
> stop(stopGracefully=true) from shutdown hook
> The issue was reported to happen also when using HDFS.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org