You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Shixiong Zhu <zs...@gmail.com> on 2014/11/07 09:03:25 UTC

Re: Bug in Accumulators...

Could you provide all pieces of codes which can reproduce the bug? Here is
my test code:

import org.apache.spark._
import org.apache.spark.SparkContext._

object SimpleApp {

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SimpleApp")
    val sc = new SparkContext(conf)

    val accum = sc.accumulator(0)
    for (i <- 1 to 10) {
      sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
    }
    sc.stop()
  }
}

It works fine both in client and cluster. Since this is a serialization
bug, the outer class does matter. Could you provide it? Is there
a SparkContext field in the outer class?

Best Regards,
Shixiong Zhu

2014-10-28 0:28 GMT+08:00 octavian.ganea <oc...@inf.ethz.ch>:

> I am also using spark 1.1.0 and I ran it on a cluster of nodes (it works
> if I
> run it in local mode! )
>
> If I put the accumulator inside the for loop, everything will work fine. I
> guess the bug is that an accumulator can be applied to JUST one RDD.
>
> Still another undocumented 'feature' of Spark that no one from the people
> who maintain Spark is willing to solve or at least to tell us about ...
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p17372.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Bug in Accumulators...

Posted by Sean Owen <so...@cloudera.com>.
Here, the Main object is not meant to be serialized. transient ought
to be for fields that are within an object that is legitimately
supposed to be serialized, but, whose value can be recreated on
deserialization. I feel like marking objects that aren't logically
Serializable as such is a hack, and transient extend that hack, and
will cause surprises later.

Hack away for toy examples but ideally the closure cleaner would snip
whatever phantom reference is at work here. I usually try to rewrite
the Scala as you say to avoid the issue rather than make things
Serializable ad hoc.

On Sun, Nov 23, 2014 at 10:49 AM, Aaron Davidson <il...@gmail.com> wrote:
> As Mohit said, making Main extend Serializable should fix this example. In
> general, it's not a bad idea to mark the fields you don't want to serialize
> (e.g., sc and conf in this case) as @transient as well, though this is not
> the issue in this case.
>
> Note that this problem would not have arisen in your very specific example
> if you used a while loop instead of a for-each loop, but that's really more
> of a happy coincidence than something you should rely on, as nested lambdas
> are virtually unavoidable in Scala.
>
> On Sat, Nov 22, 2014 at 5:16 PM, Mohit Jaggi <mo...@gmail.com> wrote:
>>
>> perhaps the closure ends up including the "main" object which is not
>> defined as serializable...try making it a "case object" or "object main
>> extends Serializable".
>>
>>
>> On Sat, Nov 22, 2014 at 4:16 PM, lordjoe <lo...@gmail.com> wrote:
>>>
>>> I posted several examples in java at http://lordjoesoftware.blogspot.com/
>>>
>>> Generally code like this works and I show how to accumulate more complex
>>> values.
>>>
>>>     // Make two accumulators using Statistics
>>>          final Accumulator<Integer> totalLetters= ctx.accumulator(0L,
>>> "ttl");
>>>          JavaRDD<string> lines = ...
>>>
>>>         JavaRDD<string> words = lines.flatMap(new FlatMapFunction<String,
>>> String>() {
>>>             @Override
>>>             public Iterable<string> call(final String s) throws Exception
>>> {
>>>                 // Handle accumulator here
>>>                 totalLetters.add(s.length()); // count all letters
>>>                 ....
>>>          });
>>>         ....
>>>          Long numberCalls = totalCounts.value();
>>>
>>> I believe the mistake is to pass the accumulator to the function rather
>>> than
>>> letting the function find the accumulator - I do this in this case by
>>> using
>>> a final local variable
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p19579.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Bug in Accumulators...

Posted by Aaron Davidson <il...@gmail.com>.
As Mohit said, making Main extend Serializable should fix this example. In
general, it's not a bad idea to mark the fields you don't want to serialize
(e.g., sc and conf in this case) as @transient as well, though this is not
the issue in this case.

Note that this problem would not have arisen in your very specific example
if you used a while loop instead of a for-each loop, but that's really more
of a happy coincidence than something you should rely on, as nested lambdas
are virtually unavoidable in Scala.

On Sat, Nov 22, 2014 at 5:16 PM, Mohit Jaggi <mo...@gmail.com> wrote:

> perhaps the closure ends up including the "main" object which is not
> defined as serializable...try making it a "case object" or "object main
> extends Serializable".
>
>
> On Sat, Nov 22, 2014 at 4:16 PM, lordjoe <lo...@gmail.com> wrote:
>
>> I posted several examples in java at http://lordjoesoftware.blogspot.com/
>>
>> Generally code like this works and I show how to accumulate more complex
>> values.
>>
>>     // Make two accumulators using Statistics
>>          final Accumulator<Integer> totalLetters= ctx.accumulator(0L,
>> "ttl");
>>          JavaRDD<string> lines = ...
>>
>>         JavaRDD<string> words = lines.flatMap(new FlatMapFunction<String,
>> String>() {
>>             @Override
>>             public Iterable<string> call(final String s) throws Exception
>> {
>>                 // Handle accumulator here
>>                 totalLetters.add(s.length()); // count all letters
>>                 ....
>>          });
>>         ....
>>          Long numberCalls = totalCounts.value();
>>
>> I believe the mistake is to pass the accumulator to the function rather
>> than
>> letting the function find the accumulator - I do this in this case by
>> using
>> a final local variable
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p19579.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>

Re: Bug in Accumulators...

Posted by Mohit Jaggi <mo...@gmail.com>.
perhaps the closure ends up including the "main" object which is not
defined as serializable...try making it a "case object" or "object main
extends Serializable".

On Sat, Nov 22, 2014 at 4:16 PM, lordjoe <lo...@gmail.com> wrote:

> I posted several examples in java at http://lordjoesoftware.blogspot.com/
>
> Generally code like this works and I show how to accumulate more complex
> values.
>
>     // Make two accumulators using Statistics
>          final Accumulator<Integer> totalLetters= ctx.accumulator(0L,
> "ttl");
>          JavaRDD<string> lines = ...
>
>         JavaRDD<string> words = lines.flatMap(new FlatMapFunction<String,
> String>() {
>             @Override
>             public Iterable<string> call(final String s) throws Exception {
>                 // Handle accumulator here
>                 totalLetters.add(s.length()); // count all letters
>                 ....
>          });
>         ....
>          Long numberCalls = totalCounts.value();
>
> I believe the mistake is to pass the accumulator to the function rather
> than
> letting the function find the accumulator - I do this in this case by using
> a final local variable
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p19579.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Bug in Accumulators...

Posted by lordjoe <lo...@gmail.com>.
I posted several examples in java at http://lordjoesoftware.blogspot.com/

Generally code like this works and I show how to accumulate more complex
values.

    // Make two accumulators using Statistics
         final Accumulator<Integer> totalLetters= ctx.accumulator(0L,
"ttl");
         JavaRDD<string> lines = ...

        JavaRDD<string> words = lines.flatMap(new FlatMapFunction<String,
String>() {
            @Override
            public Iterable<string> call(final String s) throws Exception {
                // Handle accumulator here
                totalLetters.add(s.length()); // count all letters
                ....
         });
        ....
         Long numberCalls = totalCounts.value();

I believe the mistake is to pass the accumulator to the function rather than
letting the function find the accumulator - I do this in this case by using
a final local variable



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p19579.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Bug in Accumulators...

Posted by "octavian.ganea" <oc...@inf.ethz.ch>.
Hi Sowen,

You're right, that example works, but look what example does not work for
me:

object Main  {  
  def main(args: Array[String]) { 
    val conf = new SparkConf().setAppName("name") 
    val sc = new SparkContext(conf) 
    val accum = sc.accumulator(0) 
    for (i <- 1 to 10) { 
      val y = sc.parallelize(Array(1, 2, 3, 4)).mapPartitions(x =>
foo(x,accum)).reduce(_ + _)
    } 
    println("Result : " + accum.value)
    sc.stop 
  } 
  
  def foo(i: Iterator[Int], a: Accumulator[Int]) : Iterator[Int] = {
    while (i.hasNext)  a += i.next
    List().iterator
  }
}  

This gives (I run it on a cluster with 16 nodes) the error:

Exception in thread "main" org.apache.spark.SparkException: Task not
serializable
	at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
	at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
	at EL_LBP_Spark$$anonfun$main$1.apply$mcVI$sp(EL_LBP_Spark.scala:16)
	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
	at EL_LBP_Spark$.main(EL_LBP_Spark.scala:15)
	at EL_LBP_Spark.main(EL_LBP_Spark.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
	at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
	at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
	at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
	at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
	at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
	at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
	at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
	at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
	at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
	... 14 more


Seems that there is a problem with mapPartitions ...

Thanks for your suggestion,



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p19576.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Bug in Accumulators...

Posted by Sean Owen <so...@cloudera.com>.
That seems to work fine. Add to your example

def foo(i: Int, a: Accumulator[Int]) = a += i

and add an action at the end to get the expression to evaluate:

sc.parallelize(Array(1, 2, 3, 4)).map(x => foo(x,accum)).foreach(println)

and it works, and you have accum with value 10 at the end.

The similar example at
http://spark.apache.org/docs/latest/programming-guide.html#accumulators
also works.

You say AFAIK -- are you actually able to reproduce this?

On Sat, Nov 22, 2014 at 7:01 PM, octavian.ganea
<oc...@inf.ethz.ch> wrote:
> One month later, the same problem. I think that someone (e.g. inventors of
> Spark) should show us a big example of how to use accumulators. I can start
> telling that we need to see an example of the following form:
>
> val accum = sc.accumulator(0)
> sc.parallelize(Array(1, 2, 3, 4)).map(x => foo(x,accum))
>
> Passing accum as a parameter to function foo will require it to be
> serializable, but, a.f.a.i.k  any accumulator incapsulates the spark context
> sc which is not serializable and which lead a
> "java.io.NotSerializableException: SparkContext"  exception.
>
> I am really curious to see a real application that uses accumulators.
> Otherwise, you have to change their code such that the above issue does not
> appear anymore.
>
> Best,
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p19567.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Bug in Accumulators...

Posted by "octavian.ganea" <oc...@inf.ethz.ch>.
One month later, the same problem. I think that someone (e.g. inventors of
Spark) should show us a big example of how to use accumulators. I can start
telling that we need to see an example of the following form:

val accum = sc.accumulator(0)
sc.parallelize(Array(1, 2, 3, 4)).map(x => foo(x,accum))

Passing accum as a parameter to function foo will require it to be
serializable, but, a.f.a.i.k  any accumulator incapsulates the spark context
sc which is not serializable and which lead a
"java.io.NotSerializableException: SparkContext"  exception. 

I am really curious to see a real application that uses accumulators.
Otherwise, you have to change their code such that the above issue does not
appear anymore.

Best,



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p19567.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Bug in Accumulators...

Posted by Jake Mannix <ja...@gmail.com>.
I'm running into similar problems with accumulators failing to serialize
properly.  Are there any examples of accumulators being used in more
complex environments than simply initializing them in the same class and
then using them in a .foreach() on an RDD referenced a few lines below?

>From the above looking error, it looks like any scala complexity at all
which is added causes the closure cleaner to freak out with accumulators...

On Fri, Nov 7, 2014 at 12:12 AM, Aaron Davidson <il...@gmail.com> wrote:

> This may be due in part to Scala allocating an anonymous inner class in
> order to execute the for loop. I would expect if you change it to a while
> loop like
>
> var i = 0
> while (i < 10) {
>   sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
>   i += 1
> }
>
> then the problem may go away. I am not super familiar with the closure
> cleaner, but I believe that we cannot prune beyond 1 layer of references,
> so the extra class of nesting may be screwing something up. If this is the
> case, then I would also expect replacing the accumulator with any other
> reference to the enclosing scope (such as a broadcast variable) would have
> the same result.
>
> On Fri, Nov 7, 2014 at 12:03 AM, Shixiong Zhu <zs...@gmail.com> wrote:
>
>> Could you provide all pieces of codes which can reproduce the bug? Here
>> is my test code:
>>
>> import org.apache.spark._
>> import org.apache.spark.SparkContext._
>>
>> object SimpleApp {
>>
>>   def main(args: Array[String]) {
>>     val conf = new SparkConf().setAppName("SimpleApp")
>>     val sc = new SparkContext(conf)
>>
>>     val accum = sc.accumulator(0)
>>     for (i <- 1 to 10) {
>>       sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
>>     }
>>     sc.stop()
>>   }
>> }
>>
>> It works fine both in client and cluster. Since this is a serialization
>> bug, the outer class does matter. Could you provide it? Is there
>> a SparkContext field in the outer class?
>>
>> Best Regards,
>> Shixiong Zhu
>>
>> 2014-10-28 0:28 GMT+08:00 octavian.ganea <oc...@inf.ethz.ch>:
>>
>> I am also using spark 1.1.0 and I ran it on a cluster of nodes (it works
>>> if I
>>> run it in local mode! )
>>>
>>> If I put the accumulator inside the for loop, everything will work fine.
>>> I
>>> guess the bug is that an accumulator can be applied to JUST one RDD.
>>>
>>> Still another undocumented 'feature' of Spark that no one from the people
>>> who maintain Spark is willing to solve or at least to tell us about ...
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p17372.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>>
>>
>


-- 

  -jake

Re: Bug in Accumulators...

Posted by Aaron Davidson <il...@gmail.com>.
This may be due in part to Scala allocating an anonymous inner class in
order to execute the for loop. I would expect if you change it to a while
loop like

var i = 0
while (i < 10) {
  sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
  i += 1
}

then the problem may go away. I am not super familiar with the closure
cleaner, but I believe that we cannot prune beyond 1 layer of references,
so the extra class of nesting may be screwing something up. If this is the
case, then I would also expect replacing the accumulator with any other
reference to the enclosing scope (such as a broadcast variable) would have
the same result.

On Fri, Nov 7, 2014 at 12:03 AM, Shixiong Zhu <zs...@gmail.com> wrote:

> Could you provide all pieces of codes which can reproduce the bug? Here is
> my test code:
>
> import org.apache.spark._
> import org.apache.spark.SparkContext._
>
> object SimpleApp {
>
>   def main(args: Array[String]) {
>     val conf = new SparkConf().setAppName("SimpleApp")
>     val sc = new SparkContext(conf)
>
>     val accum = sc.accumulator(0)
>     for (i <- 1 to 10) {
>       sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
>     }
>     sc.stop()
>   }
> }
>
> It works fine both in client and cluster. Since this is a serialization
> bug, the outer class does matter. Could you provide it? Is there
> a SparkContext field in the outer class?
>
> Best Regards,
> Shixiong Zhu
>
> 2014-10-28 0:28 GMT+08:00 octavian.ganea <oc...@inf.ethz.ch>:
>
> I am also using spark 1.1.0 and I ran it on a cluster of nodes (it works
>> if I
>> run it in local mode! )
>>
>> If I put the accumulator inside the for loop, everything will work fine. I
>> guess the bug is that an accumulator can be applied to JUST one RDD.
>>
>> Still another undocumented 'feature' of Spark that no one from the people
>> who maintain Spark is willing to solve or at least to tell us about ...
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p17372.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>