You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by jan <ja...@insidin.com> on 2015/11/23 16:22:18 UTC

Spark-1.6.0-preview2 trackStateByKey exception restoring state

Hi guys,

I'm trying out the new trackStateByKey API of the Spark-1.6.0-preview2
release and I'm encountering an exception when trying to restore previously
checkpointed state in spark streaming.

Use case:
- execute a stateful Spark streaming job using trackStateByKey
- interrupt / kill the job
- start the job again (without any code changes or cleaning out the
checkpoint directory)

Upon this restart, I encounter the exception below. The nature of the
exception makes me think either I am doing something wrong, or there's a
problem with this use case for the new trackStateByKey API.

I uploaded my job code (
https://gist.github.com/juyttenh/be7973b0c5c2eddd8a81), but it's basically
just a modified version of the spark streaming example
StatefulNetworkWordCount (that had already been updated to use
trackStateByKey). My version however includes the usage of
StreamingContext.getOrCreate to actually read the checkpointed state when
the job is started, leading to the exception below.

Just to make sure: using StreamingContext.getOrCreate should still be
compatible with using the trackStateByKey API?

Thanx,
Jan

15/11/23 10:55:07 ERROR StreamingContext: Error starting the context,
marking it as stopped

java.lang.IllegalArgumentException: requirement failed

at scala.Predef$.require(Predef.scala:221)

at
org.apache.spark.streaming.rdd.TrackStateRDD.<init>(TrackStateRDD.scala:133)

at
org.apache.spark.streaming.dstream.InternalTrackStateDStream$$anonfun$compute$2.apply(TrackStateDStream.scala:148)

at
org.apache.spark.streaming.dstream.InternalTrackStateDStream$$anonfun$compute$2.apply(TrackStateDStream.scala:143)

at scala.Option.map(Option.scala:145)

at
org.apache.spark.streaming.dstream.InternalTrackStateDStream.compute(TrackStateDStream.scala:143)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)

at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:424)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)

at scala.Option.orElse(Option.scala:257)

at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)

at
org.apache.spark.streaming.dstream.TrackStateDStreamImpl.compute(TrackStateDStream.scala:66)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)

at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:424)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)

at scala.Option.orElse(Option.scala:257)

at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)

at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)

at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)

at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)

at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)

at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)

at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)

at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)

at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)

at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:231)

at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:226)

at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)

at
org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:226)

at
org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:96)

at
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:83)

at
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:609)

at
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:605)

at
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:605)

at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()

at
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:605)

at
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:599)

at
org.apache.spark.examples.streaming.StatefulNetworkWordCount$.main(StatefulNetworkWordCount.scala:48)

at
org.apache.spark.examples.streaming.StatefulNetworkWordCount.main(StatefulNetworkWordCount.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:483)

at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:727)

at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)

at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)




--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-1-6-0-preview2-trackStateByKey-exception-restoring-state-tp15318.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Re: Spark-1.6.0-preview2 trackStateByKey exception restoring state

Posted by Tathagata Das <td...@databricks.com>.
My intention is to make it compatible! Filed this bug -
https://issues.apache.org/jira/browse/SPARK-11932
Looking into it right now. Thanks for testing it out and reporting this!


On Mon, Nov 23, 2015 at 7:22 AM, jan <ja...@insidin.com> wrote:

> Hi guys,
>
> I'm trying out the new trackStateByKey API of the Spark-1.6.0-preview2
> release and I'm encountering an exception when trying to restore previously
> checkpointed state in spark streaming.
>
> Use case:
> - execute a stateful Spark streaming job using trackStateByKey
> - interrupt / kill the job
> - start the job again (without any code changes or cleaning out the
> checkpoint directory)
>
> Upon this restart, I encounter the exception below. The nature of the
> exception makes me think either I am doing something wrong, or there's a
> problem with this use case for the new trackStateByKey API.
>
> I uploaded my job code (
> https://gist.github.com/juyttenh/be7973b0c5c2eddd8a81), but it's
> basically just a modified version of the spark streaming example
> StatefulNetworkWordCount (that had already been updated to use
> trackStateByKey). My version however includes the usage of
> StreamingContext.getOrCreate to actually read the checkpointed state when
> the job is started, leading to the exception below.
>
> Just to make sure: using StreamingContext.getOrCreate should still be
> compatible with using the trackStateByKey API?
>
> Thanx,
> Jan
>
> 15/11/23 10:55:07 ERROR StreamingContext: Error starting the context,
> marking it as stopped
>
> java.lang.IllegalArgumentException: requirement failed
>
> at scala.Predef$.require(Predef.scala:221)
>
> at
> org.apache.spark.streaming.rdd.TrackStateRDD.<init>(TrackStateRDD.scala:133)
>
> at
> org.apache.spark.streaming.dstream.InternalTrackStateDStream$$anonfun$compute$2.apply(TrackStateDStream.scala:148)
>
> at
> org.apache.spark.streaming.dstream.InternalTrackStateDStream$$anonfun$compute$2.apply(TrackStateDStream.scala:143)
>
> at scala.Option.map(Option.scala:145)
>
> at
> org.apache.spark.streaming.dstream.InternalTrackStateDStream.compute(TrackStateDStream.scala:143)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:424)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>
> at scala.Option.orElse(Option.scala:257)
>
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>
> at
> org.apache.spark.streaming.dstream.TrackStateDStreamImpl.compute(TrackStateDStream.scala:66)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:424)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>
> at scala.Option.orElse(Option.scala:257)
>
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>
> at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
>
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
>
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)
>
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>
> at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)
>
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:231)
>
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:226)
>
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>
> at
> org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:226)
>
> at
> org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:96)
>
> at
> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:83)
>
> at
> org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:609)
>
> at
> org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:605)
>
> at
> org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:605)
>
> at ... run in separate thread using org.apache.spark.util.ThreadUtils ...
> ()
>
> at
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:605)
>
> at
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:599)
>
> at
> org.apache.spark.examples.streaming.StatefulNetworkWordCount$.main(StatefulNetworkWordCount.scala:48)
>
> at
> org.apache.spark.examples.streaming.StatefulNetworkWordCount.main(StatefulNetworkWordCount.scala)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:483)
>
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:727)
>
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
>
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
>
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
>
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
> ------------------------------
> View this message in context: Spark-1.6.0-preview2 trackStateByKey
> exception restoring state
> <http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-1-6-0-preview2-trackStateByKey-exception-restoring-state-tp15318.html>
> Sent from the Apache Spark Developers List mailing list archive
> <http://apache-spark-developers-list.1001551.n3.nabble.com/> at
> Nabble.com.
>