You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Svend <sv...@gmail.com> on 2014/09/25 16:20:40 UTC

Systematic error when re-starting Spark stream unless I delete all checkpoints

I experience spark streaming restart issues similar to what is discussed in
the 2 threads below (in which I failed to find a solution). Could anybody
let me know if anything is wrong in the way I start/stop or if this could be
a spark bug?

http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-td14847.html
http://apache-spark-user-list.1001560.n3.nabble.com/KafkaReciever-Error-when-starting-ssc-Actor-name-not-unique-tc3978.html

My stream reads a Kafka topic, does some processing involving an
updatStateByKey and saves the result to HDFS. 

The context is (re)-created at startup as follows: 



And the start-up and shutdown of the stream is handled as follows: 




When starting the stream for the first time (with spark-submit), the
processing happens successfully, folders are created on the target HDFS
folder and streaming stats are visible on http://sparkhost:4040/streaming.

After letting the streaming work several minutes and then stopping it
(ctrl-c on the command line), the following info is visible in the
checkpoint folder: 



(checkpoint clean-up seems to happen since the stream ran for much more than
5 times 10 seconds)

When re-starting the stream, the startup fails with the error below,
http://sparkhost:4040/streaming shows no statistics, no new HDFS folder is
added in the target folder and no new checkpoint are created: 






Now if I delete all older checkpoints and keep only the most recent one: 



I end up with this (kafka?) actor non unique name error. 



If I delete the checkpoint folder the stream starts successfully (but I lose
my ongoing stream state, obviously)

We're running spark 1.1.0 on Mesos 0.20. Our spark assembly is packaged with
CDH 5.1.0 and Hive: 



Any comment or suggestion would be greatly appreciated.

















--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Systematic-error-when-re-starting-Spark-stream-unless-I-delete-all-checkpoints-tp15142.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Systematic error when re-starting Spark stream unless I delete all checkpoints

Posted by Svend Vanderveken <sv...@gmail.com>.
Hi again,


Just FYI, I found the mistake in my code regarding restartability of spark
streaming: I had a method providing a context (either retrieved from
checkpoint or, if no checkpoint available, built anew) and was building
then starting a stream on it.

The mistake is that we should not build a stream out of a context retrieved
from checkpoint since it already contains all necessary elements, as also
illustrated in the RecoverableNetworkWordCount
(*https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
<https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala>)*

S










On Fri, Sep 26, 2014 at 3:09 PM, Svend Vanderveken <
svend.vanderveken@gmail.com> wrote:

> Hi all,
>
> I apologise for re-posting this, I realise some mail systems are filtering
> all the code samples from the original post.
>
> I would greatly appreciate any pointer regarding, this issue basically
> renders spark streaming not fault-tolerant for us.
>
> Thanks in advance,
>
> S
>
>
>
> ---
>
> "
> I experience spark streaming restart issues similar to what is discussed
> in the 2 threads below (in which I failed to find a solution). Could
> anybody let me know if anything is wrong in the way I start/stop or if this
> could be a spark bug?
>
>
> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-td14847.html
>
> http://apache-spark-user-list.1001560.n3.nabble.com/KafkaReciever-Error-when-starting-ssc-Actor-name-not-unique-tc3978.html
>
> My stream reads a Kafka topic, does some processing involving an
> updatStateByKey and saves the result to HDFS.
>
> The context is (re)-created at startup as follows:
>
> def streamContext() = {
>
>     def newContext() = {
>       val ctx = new StreamingContext(sparkConf, Duration(10000))
>       ctx.checkpoint("hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/")
>       ctx
>     }
>
>     StreamingContext.getOrCreate("hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/", newContext)
>   }
>
>
> And the start-up and shutdown of the stream is handled as follows:
>
> try {
>
>     val sparkContext = streamContext()
>
>     [.. build stream here...]
>
>     sparkContext.start()
>     sparkContext.awaitTermination()
>
>   } catch {
>       case e: Throwable =>
>         log.error("shutting down tabulation stream...", e)
>         sparkContext.stop()
>         log.info("...waiting termination...")
>         sparkContext.awaitTermination()
>         log.info("...tabulation stream stopped")
>   }
>
>
>
> When starting the stream for the first time (with spark-submit), the
> processing happens successfully, folders are created on the target HDFS
> folder and streaming stats are visible on http://sparkhost:4040/streaming
> .
>
> After letting the streaming work several minutes and then stopping it
> (ctrl-c on the command line), the following info is visible in the
> checkpoint folder:
>
> mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/
> 14/09/25 09:39:13 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
> Found 11 items
> drwxr-xr-x   - mnubohadoop hadoop          0 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8
> -rw-r--r--   3 mnubohadoop hadoop       5479 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652290000
> -rw-r--r--   3 mnubohadoop hadoop       5512 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652290000.bk
> -rw-r--r--   3 mnubohadoop hadoop       5479 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652300000
> -rw-r--r--   3 mnubohadoop hadoop       5507 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652300000.bk
> -rw-r--r--   3 mnubohadoop hadoop       5476 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652310000
> -rw-r--r--   3 mnubohadoop hadoop       5504 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652310000.bk
> -rw-r--r--   3 mnubohadoop hadoop       5477 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652320000
> -rw-r--r--   3 mnubohadoop hadoop       5506 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652320000.bk
> -rw-r--r--   3 mnubohadoop hadoop       5484 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000
> -rw-r--r--   3 mnubohadoop hadoop       5504 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000.bk
> mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8
> 14/09/25 09:42:08 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
> Found 2 items
> drwxr-xr-x   - mnubohadoop hadoop          0 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8/rdd-8438
> drwxr-xr-x   - mnubohadoop hadoop          0 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8/rdd-8542
>
>
> (checkpoint clean-up seems to happen since the stream ran for much more
> than 5 times 10 seconds)
>
> When re-starting the stream, the startup fails with the error below,
> http://sparkhost:4040/streaming shows no statistics, no new HDFS folder
> is added in the target folder and no new checkpoint are created:
>
> 09:45:05.038 [main] ERROR c.mnubo.analytic.tabulate.StreamApp - shutting down tabulation stream...
> org.apache.spark.SparkException: org.apache.spark.streaming.dstream.FilteredDStream@e8949a1 has not been initialized
>         at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:263) ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
>         at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:290) ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
>         at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
>         at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115) ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
>         at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115) ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
>         at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
>         at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
>         at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
>         at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
>         at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:115) ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
>         at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:210) ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
>         at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:209) ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
>         at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
>         at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
>         at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:209) ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
>         at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:80) ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
>         at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:66) ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
>         at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:444) ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
>
>
>
> mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/
> 14/09/25 09:48:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
> Found 12 items
> drwxr-xr-x   - mnubohadoop hadoop          0 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8
> drwxr-xr-x   - mnubohadoop hadoop          0 2014-09-25 09:43 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/a9dded7b-a288-44da-89d0-0309a73fab3a
> -rw-r--r--   3 mnubohadoop hadoop       5479 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652290000
> -rw-r--r--   3 mnubohadoop hadoop       5512 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652290000.bk
> -rw-r--r--   3 mnubohadoop hadoop       5479 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652300000
> -rw-r--r--   3 mnubohadoop hadoop       5507 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652300000.bk
> -rw-r--r--   3 mnubohadoop hadoop       5476 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652310000
> -rw-r--r--   3 mnubohadoop hadoop       5504 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652310000.bk
> -rw-r--r--   3 mnubohadoop hadoop       5477 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652320000
> -rw-r--r--   3 mnubohadoop hadoop       5506 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652320000.bk
> -rw-r--r--   3 mnubohadoop hadoop       5484 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000
> -rw-r--r--   3 mnubohadoop hadoop       5504 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000.bk
>
>
> Now if I delete all older checkpoints and keep only the most recent one:
>
> mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/
> 14/09/25 10:06:08 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
> Found 3 items
> drwxr-xr-x   - mnubohadoop hadoop          0 2014-09-25 09:43 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/a9dded7b-a288-44da-89d0-0309a73fab3a
> -rw-r--r--   3 mnubohadoop hadoop       5484 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000
> -rw-r--r--   3 mnubohadoop hadoop       5504 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000.bk
>
>
> I end up with this (kafka?) actor non unique name error.
>
> 10:07:25.088 [Result resolver thread-0] WARN  o.a.spark.scheduler.TaskSetManager - Lost task 1.0 in stage 3.0 (TID 73, vm21-hulk-priv.mtl.mnubo.com): akka.actor.InvalidActorNameException: actor name [Receiver-0-1411654045063] is not unique!
>         akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130)
>         akka.actor.dungeon.Children$class.reserveChild(Children.scala:77)
>         akka.actor.ActorCell.reserveChild(ActorCell.scala:338)
>         akka.actor.dungeon.Children$class.makeChild(Children.scala:186)
>         akka.actor.dungeon.Children$class.attachChild(Children.scala:42)
>         akka.actor.ActorCell.attachChild(ActorCell.scala:338)
>         akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:518)
>         org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.<init>(ReceiverSupervisorImpl.scala:67)
>         org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:263)
>         org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>         org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>         org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>         org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>         org.apache.spark.scheduler.Task.run(Task.scala:54)
>         org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>         java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
>
> If I delete the checkpoint folder the stream starts successfully (but I
> lose my ongoing stream state, obviously)
>
> We're running spark 1.1.0 on Mesos 0.20. Our spark assembly is packaged
> with CDH 5.1.0 and Hive:
>
> sbt/sbt clean assembly/assembly -Dhadoop.version=2.3.0-mr1-cdh5.1.0 -Phive
> ./make-distribution.sh --tgz --skip-java-test -Dhadoop.version=2.3.0-mr1-cdh5.1.0 -Phive
>
>
> Any comment or suggestion would be greatly appreciated.
> "
>
> On Thu, Sep 25, 2014 at 4:20 PM, Svend <sv...@gmail.com>
> wrote:
>
>> I experience spark streaming restart issues similar to what is discussed
>> in
>> the 2 threads below (in which I failed to find a solution). Could anybody
>> let me know if anything is wrong in the way I start/stop or if this could
>> be
>> a spark bug?
>>
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-td14847.html
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/KafkaReciever-Error-when-starting-ssc-Actor-name-not-unique-tc3978.html
>>
>> My stream reads a Kafka topic, does some processing involving an
>> updatStateByKey and saves the result to HDFS.
>>
>> The context is (re)-created at startup as follows:
>>
>>
>>
>> And the start-up and shutdown of the stream is handled as follows:
>>
>>
>>
>>
>> When starting the stream for the first time (with spark-submit), the
>> processing happens successfully, folders are created on the target HDFS
>> folder and streaming stats are visible on http://sparkhost:4040/streaming
>> .
>>
>> After letting the streaming work several minutes and then stopping it
>> (ctrl-c on the command line), the following info is visible in the
>> checkpoint folder:
>>
>>
>>
>> (checkpoint clean-up seems to happen since the stream ran for much more
>> than
>> 5 times 10 seconds)
>>
>> When re-starting the stream, the startup fails with the error below,
>> http://sparkhost:4040/streaming shows no statistics, no new HDFS folder
>> is
>> added in the target folder and no new checkpoint are created:
>>
>>
>>
>>
>>
>>
>> Now if I delete all older checkpoints and keep only the most recent one:
>>
>>
>>
>> I end up with this (kafka?) actor non unique name error.
>>
>>
>>
>> If I delete the checkpoint folder the stream starts successfully (but I
>> lose
>> my ongoing stream state, obviously)
>>
>> We're running spark 1.1.0 on Mesos 0.20. Our spark assembly is packaged
>> with
>> CDH 5.1.0 and Hive:
>>
>>
>>
>> Any comment or suggestion would be greatly appreciated.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Systematic-error-when-re-starting-Spark-stream-unless-I-delete-all-checkpoints-tp15142.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>

Re: Systematic error when re-starting Spark stream unless I delete all checkpoints

Posted by Svend Vanderveken <sv...@gmail.com>.
Hi all,

I apologise for re-posting this, I realise some mail systems are filtering
all the code samples from the original post.

I would greatly appreciate any pointer regarding, this issue basically
renders spark streaming not fault-tolerant for us.

Thanks in advance,

S



---

"
I experience spark streaming restart issues similar to what is discussed in
the 2 threads below (in which I failed to find a solution). Could anybody
let me know if anything is wrong in the way I start/stop or if this could
be a spark bug?

http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-td14847.html
http://apache-spark-user-list.1001560.n3.nabble.com/KafkaReciever-Error-when-starting-ssc-Actor-name-not-unique-tc3978.html

My stream reads a Kafka topic, does some processing involving an
updatStateByKey and saves the result to HDFS.

The context is (re)-created at startup as follows:

def streamContext() = {

    def newContext() = {
      val ctx = new StreamingContext(sparkConf, Duration(10000))
      ctx.checkpoint("hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/")
      ctx
    }

    StreamingContext.getOrCreate("hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/",
newContext)
  }


And the start-up and shutdown of the stream is handled as follows:

try {

    val sparkContext = streamContext()

    [.. build stream here...]

    sparkContext.start()
    sparkContext.awaitTermination()

  } catch {
      case e: Throwable =>
        log.error("shutting down tabulation stream...", e)
        sparkContext.stop()
        log.info("...waiting termination...")
        sparkContext.awaitTermination()
        log.info("...tabulation stream stopped")
  }



When starting the stream for the first time (with spark-submit), the
processing happens successfully, folders are created on the target HDFS
folder and streaming stats are visible on http://sparkhost:4040/streaming.

After letting the streaming work several minutes and then stopping it
(ctrl-c on the command line), the following info is visible in the
checkpoint folder:

mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/
14/09/25 09:39:13 WARN util.NativeCodeLoader: Unable to load
native-hadoop library for your platform... using builtin-java classes
where applicable
Found 11 items
drwxr-xr-x   - mnubohadoop hadoop          0 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8
-rw-r--r--   3 mnubohadoop hadoop       5479 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652290000
-rw-r--r--   3 mnubohadoop hadoop       5512 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652290000.bk
-rw-r--r--   3 mnubohadoop hadoop       5479 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652300000
-rw-r--r--   3 mnubohadoop hadoop       5507 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652300000.bk
-rw-r--r--   3 mnubohadoop hadoop       5476 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652310000
-rw-r--r--   3 mnubohadoop hadoop       5504 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652310000.bk
-rw-r--r--   3 mnubohadoop hadoop       5477 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652320000
-rw-r--r--   3 mnubohadoop hadoop       5506 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652320000.bk
-rw-r--r--   3 mnubohadoop hadoop       5484 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000
-rw-r--r--   3 mnubohadoop hadoop       5504 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000.bk
mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8
14/09/25 09:42:08 WARN util.NativeCodeLoader: Unable to load
native-hadoop library for your platform... using builtin-java classes
where applicable
Found 2 items
drwxr-xr-x   - mnubohadoop hadoop          0 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8/rdd-8438
drwxr-xr-x   - mnubohadoop hadoop          0 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8/rdd-8542


(checkpoint clean-up seems to happen since the stream ran for much more
than 5 times 10 seconds)

When re-starting the stream, the startup fails with the error below,
http://sparkhost:4040/streaming shows no statistics, no new HDFS folder is
added in the target folder and no new checkpoint are created:

09:45:05.038 [main] ERROR c.mnubo.analytic.tabulate.StreamApp -
shutting down tabulation stream...
org.apache.spark.SparkException:
org.apache.spark.streaming.dstream.FilteredDStream@e8949a1 has not
been initialized
        at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:263)
~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:290)
~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
        at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
        at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:115)
~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:210)
~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:209)
~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
        at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:209)
~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
        at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:80)
~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
        at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:66)
~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
        at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:444)
~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]



mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/
14/09/25 09:48:39 WARN util.NativeCodeLoader: Unable to load
native-hadoop library for your platform... using builtin-java classes
where applicable
Found 12 items
drwxr-xr-x   - mnubohadoop hadoop          0 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8
drwxr-xr-x   - mnubohadoop hadoop          0 2014-09-25 09:43
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/a9dded7b-a288-44da-89d0-0309a73fab3a
-rw-r--r--   3 mnubohadoop hadoop       5479 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652290000
-rw-r--r--   3 mnubohadoop hadoop       5512 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652290000.bk
-rw-r--r--   3 mnubohadoop hadoop       5479 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652300000
-rw-r--r--   3 mnubohadoop hadoop       5507 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652300000.bk
-rw-r--r--   3 mnubohadoop hadoop       5476 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652310000
-rw-r--r--   3 mnubohadoop hadoop       5504 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652310000.bk
-rw-r--r--   3 mnubohadoop hadoop       5477 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652320000
-rw-r--r--   3 mnubohadoop hadoop       5506 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652320000.bk
-rw-r--r--   3 mnubohadoop hadoop       5484 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000
-rw-r--r--   3 mnubohadoop hadoop       5504 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000.bk


Now if I delete all older checkpoints and keep only the most recent one:

mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/
14/09/25 10:06:08 WARN util.NativeCodeLoader: Unable to load
native-hadoop library for your platform... using builtin-java classes
where applicable
Found 3 items
drwxr-xr-x   - mnubohadoop hadoop          0 2014-09-25 09:43
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/a9dded7b-a288-44da-89d0-0309a73fab3a
-rw-r--r--   3 mnubohadoop hadoop       5484 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000
-rw-r--r--   3 mnubohadoop hadoop       5504 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000.bk


I end up with this (kafka?) actor non unique name error.

10:07:25.088 [Result resolver thread-0] WARN
o.a.spark.scheduler.TaskSetManager - Lost task 1.0 in stage 3.0 (TID
73, vm21-hulk-priv.mtl.mnubo.com):
akka.actor.InvalidActorNameException: actor name
[Receiver-0-1411654045063] is not unique!
        akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130)
        akka.actor.dungeon.Children$class.reserveChild(Children.scala:77)
        akka.actor.ActorCell.reserveChild(ActorCell.scala:338)
        akka.actor.dungeon.Children$class.makeChild(Children.scala:186)
        akka.actor.dungeon.Children$class.attachChild(Children.scala:42)
        akka.actor.ActorCell.attachChild(ActorCell.scala:338)
        akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:518)
        org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.<init>(ReceiverSupervisorImpl.scala:67)
        org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:263)
        org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
        org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
        org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)


If I delete the checkpoint folder the stream starts successfully (but I
lose my ongoing stream state, obviously)

We're running spark 1.1.0 on Mesos 0.20. Our spark assembly is packaged
with CDH 5.1.0 and Hive:

sbt/sbt clean assembly/assembly -Dhadoop.version=2.3.0-mr1-cdh5.1.0 -Phive
./make-distribution.sh --tgz --skip-java-test
-Dhadoop.version=2.3.0-mr1-cdh5.1.0 -Phive


Any comment or suggestion would be greatly appreciated.
"

On Thu, Sep 25, 2014 at 4:20 PM, Svend <sv...@gmail.com> wrote:

> I experience spark streaming restart issues similar to what is discussed in
> the 2 threads below (in which I failed to find a solution). Could anybody
> let me know if anything is wrong in the way I start/stop or if this could
> be
> a spark bug?
>
>
> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-td14847.html
>
> http://apache-spark-user-list.1001560.n3.nabble.com/KafkaReciever-Error-when-starting-ssc-Actor-name-not-unique-tc3978.html
>
> My stream reads a Kafka topic, does some processing involving an
> updatStateByKey and saves the result to HDFS.
>
> The context is (re)-created at startup as follows:
>
>
>
> And the start-up and shutdown of the stream is handled as follows:
>
>
>
>
> When starting the stream for the first time (with spark-submit), the
> processing happens successfully, folders are created on the target HDFS
> folder and streaming stats are visible on http://sparkhost:4040/streaming.
>
> After letting the streaming work several minutes and then stopping it
> (ctrl-c on the command line), the following info is visible in the
> checkpoint folder:
>
>
>
> (checkpoint clean-up seems to happen since the stream ran for much more
> than
> 5 times 10 seconds)
>
> When re-starting the stream, the startup fails with the error below,
> http://sparkhost:4040/streaming shows no statistics, no new HDFS folder is
> added in the target folder and no new checkpoint are created:
>
>
>
>
>
>
> Now if I delete all older checkpoints and keep only the most recent one:
>
>
>
> I end up with this (kafka?) actor non unique name error.
>
>
>
> If I delete the checkpoint folder the stream starts successfully (but I
> lose
> my ongoing stream state, obviously)
>
> We're running spark 1.1.0 on Mesos 0.20. Our spark assembly is packaged
> with
> CDH 5.1.0 and Hive:
>
>
>
> Any comment or suggestion would be greatly appreciated.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Systematic-error-when-re-starting-Spark-stream-unless-I-delete-all-checkpoints-tp15142.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>