You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Shushant Arora <sh...@gmail.com> on 2015/07/29 22:15:38 UTC

broadcast variable and accumulators issue while spark streaming checkpoint recovery

Hi

I am using spark streaming 1.3 and using checkpointing.
But job is failing to recover from checkpoint on restart.

For broadcast variable it says :
1.WARN TaskSetManager: Lost task 15.0 in stage 7.0 (TID 1269, hostIP):
java.lang.ClassCastException: [B cannot be cast to
pkg.broadcastvariableclassname
at point where i call bcvariable.value() in map function.

 at  mapfunction......
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184)
        at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184)
        at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
        at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

2.For accumulator variable it says :
15/07/29 19:23:12 ERROR DAGScheduler: Failed to update accumulators for
ResultTask(1, 16)
java.util.NoSuchElementException: key not found: 2
        at scala.collection.MapLike$class.default(MapLike.scala:228)
        at scala.collection.AbstractMap.default(Map.scala:58)
        at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:894)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:893)
        at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at
org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:893)

its descibed in
https://issues.apache.org/jira/browse/SPARK-5206

I can afford to reset the accumulator to 0 on stream restart . Is it
possible to have it working ?

Thanks

Re: broadcast variable and accumulators issue while spark streaming checkpoint recovery

Posted by Tathagata Das <td...@databricks.com>.
1. Same way, using static fields in a class.
2. Yes, same way.
3. Yes, you can do that. To differentiate from "first time" v/s "continue",
you have to build your own semantics. For example, if the location in HDFS
you are suppose to store the offsets does not have any data, that means its
probably running the first time. Otherwise it is continuing, and so read
offset and continue from there.

On Wed, Jul 29, 2015 at 9:38 PM, Shushant Arora <sh...@gmail.com>
wrote:

> 1.How to do it in java?
> 2.Can broadcast objects also be created in same way  after checkpointing.
> 3.Is it safe If I disable checkpoint and write offsets at end of each
> batch to hdfs in mycode  and somehow specify in my job to use this offset
> for creating kafkastream at first time. How can I specify
> javasparkstreaming context to use this offsets while creating kafkastream
> at first time only and after that use from previous batch interval's
> offsets..
>
> On Thu, Jul 30, 2015 at 2:49 AM, Tathagata Das <td...@databricks.com>
> wrote:
>
>> Rather than using accumulator directly, what you can do is something like
>> this to lazily create an accumulator and use it (will get lazily recreated
>> if driver restarts from checkpoint)
>>
>>
>> dstream.transform { rdd =>
>>     val accum = SingletonObject.getOrCreateAccumulator()   // single
>> object method to create an accumulator or get an already created one.
>>     rdd.map { x =>  /// use accum  }
>> }
>>
>>
>> On Wed, Jul 29, 2015 at 1:15 PM, Shushant Arora <
>> shushantarora09@gmail.com> wrote:
>>
>>> Hi
>>>
>>> I am using spark streaming 1.3 and using checkpointing.
>>> But job is failing to recover from checkpoint on restart.
>>>
>>> For broadcast variable it says :
>>> 1.WARN TaskSetManager: Lost task 15.0 in stage 7.0 (TID 1269, hostIP):
>>> java.lang.ClassCastException: [B cannot be cast to
>>> pkg.broadcastvariableclassname
>>> at point where i call bcvariable.value() in map function.
>>>
>>>  at  mapfunction......
>>> at
>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184)
>>>         at
>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184)
>>>         at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
>>>         at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
>>>         at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>         at
>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>         at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>         at java.lang.Thread.run(Thread.java:745)
>>>
>>> 2.For accumulator variable it says :
>>> 15/07/29 19:23:12 ERROR DAGScheduler: Failed to update accumulators for
>>> ResultTask(1, 16)
>>> java.util.NoSuchElementException: key not found: 2
>>>         at scala.collection.MapLike$class.default(MapLike.scala:228)
>>>         at scala.collection.AbstractMap.default(Map.scala:58)
>>>         at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>>>         at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:894)
>>>         at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:893)
>>>         at
>>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>>>         at
>>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>>>         at
>>> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>>>         at
>>> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>>>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>>>         at
>>> org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:893)
>>>
>>> its descibed in
>>> https://issues.apache.org/jira/browse/SPARK-5206
>>>
>>> I can afford to reset the accumulator to 0 on stream restart . Is it
>>> possible to have it working ?
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>>
>>>
>>
>

Re: broadcast variable and accumulators issue while spark streaming checkpoint recovery

Posted by Shushant Arora <sh...@gmail.com>.
1.How to do it in java?
2.Can broadcast objects also be created in same way  after checkpointing.
3.Is it safe If I disable checkpoint and write offsets at end of each batch
to hdfs in mycode  and somehow specify in my job to use this offset for
creating kafkastream at first time. How can I specify javasparkstreaming
context to use this offsets while creating kafkastream at first time only
and after that use from previous batch interval's offsets..

On Thu, Jul 30, 2015 at 2:49 AM, Tathagata Das <td...@databricks.com> wrote:

> Rather than using accumulator directly, what you can do is something like
> this to lazily create an accumulator and use it (will get lazily recreated
> if driver restarts from checkpoint)
>
>
> dstream.transform { rdd =>
>     val accum = SingletonObject.getOrCreateAccumulator()   // single
> object method to create an accumulator or get an already created one.
>     rdd.map { x =>  /// use accum  }
> }
>
>
> On Wed, Jul 29, 2015 at 1:15 PM, Shushant Arora <shushantarora09@gmail.com
> > wrote:
>
>> Hi
>>
>> I am using spark streaming 1.3 and using checkpointing.
>> But job is failing to recover from checkpoint on restart.
>>
>> For broadcast variable it says :
>> 1.WARN TaskSetManager: Lost task 15.0 in stage 7.0 (TID 1269, hostIP):
>> java.lang.ClassCastException: [B cannot be cast to
>> pkg.broadcastvariableclassname
>> at point where i call bcvariable.value() in map function.
>>
>>  at  mapfunction......
>> at
>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184)
>>         at
>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184)
>>         at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
>>         at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
>>         at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>         at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>> 2.For accumulator variable it says :
>> 15/07/29 19:23:12 ERROR DAGScheduler: Failed to update accumulators for
>> ResultTask(1, 16)
>> java.util.NoSuchElementException: key not found: 2
>>         at scala.collection.MapLike$class.default(MapLike.scala:228)
>>         at scala.collection.AbstractMap.default(Map.scala:58)
>>         at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>>         at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:894)
>>         at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:893)
>>         at
>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>>         at
>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>>         at
>> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>>         at
>> org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:893)
>>
>> its descibed in
>> https://issues.apache.org/jira/browse/SPARK-5206
>>
>> I can afford to reset the accumulator to 0 on stream restart . Is it
>> possible to have it working ?
>>
>> Thanks
>>
>>
>>
>>
>>
>>
>

Re: broadcast variable and accumulators issue while spark streaming checkpoint recovery

Posted by Tathagata Das <td...@databricks.com>.
Rather than using accumulator directly, what you can do is something like
this to lazily create an accumulator and use it (will get lazily recreated
if driver restarts from checkpoint)


dstream.transform { rdd =>
    val accum = SingletonObject.getOrCreateAccumulator()   // single object
method to create an accumulator or get an already created one.
    rdd.map { x =>  /// use accum  }
}


On Wed, Jul 29, 2015 at 1:15 PM, Shushant Arora <sh...@gmail.com>
wrote:

> Hi
>
> I am using spark streaming 1.3 and using checkpointing.
> But job is failing to recover from checkpoint on restart.
>
> For broadcast variable it says :
> 1.WARN TaskSetManager: Lost task 15.0 in stage 7.0 (TID 1269, hostIP):
> java.lang.ClassCastException: [B cannot be cast to
> pkg.broadcastvariableclassname
> at point where i call bcvariable.value() in map function.
>
>  at  mapfunction......
> at
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184)
>         at
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184)
>         at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
>         at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>         at org.apache.spark.scheduler.Task.run(Task.scala:64)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
>
> 2.For accumulator variable it says :
> 15/07/29 19:23:12 ERROR DAGScheduler: Failed to update accumulators for
> ResultTask(1, 16)
> java.util.NoSuchElementException: key not found: 2
>         at scala.collection.MapLike$class.default(MapLike.scala:228)
>         at scala.collection.AbstractMap.default(Map.scala:58)
>         at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:894)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:893)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>         at
> org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:893)
>
> its descibed in
> https://issues.apache.org/jira/browse/SPARK-5206
>
> I can afford to reset the accumulator to 0 on stream restart . Is it
> possible to have it working ?
>
> Thanks
>
>
>
>
>
>