You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ian Bonnycastle <ib...@gmail.com> on 2014/04/14 18:17:08 UTC

reduceByKey issue in example wordcount (scala)

Good afternoon,

I'm attempting to get the wordcount example working, and I keep getting an
error in the "reduceByKey(_ + _)" call. I've scoured the mailing lists, and
haven't been able to find a sure fire solution, unless I'm missing
something big. I did find something close, but it didn't appear to work in
my case. The error is:

org.apache.spark.SparkException: Job aborted: Task 2.0:3 failed 4 times
(most recent failure: Exception failure: java.lang.ClassNotFoundException:
SimpleApp$$anonfun$3)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
        at scala.Option.foreach(Option.scala:236)
        at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I've commented out and re-commented in the reduceByKey line to make sure it
was the cause, and it is. If I take it out, my script compiles and runs no
problem. If I put it in, I get the above error across all my nodes. I've
attempted to use the spark-shell, and it will actually process the line
properly, so I assumed it was a missing "import" statement. The only one I
could find that was anywhere close to my particular error was someone who
was having the same issues, and a "import
scala.collection.JavaConversions._" fixed his problem. This didn't appear
to work for me. Can anyone shed some light on this, as I'm pulling out my
hair trying to figure out what I'm missing.

The section of code I'm trying to get to work is:

    val JCountRes = logData.flatMap(line => line.split(" "))
                           .map(word => (word, 1))
                           .reduceByKey(_ + _)

"logData" is just an RDD pointing to a large (2gb) file in HDFS.

Thanks,

Ian

Re: reduceByKey issue in example wordcount (scala)

Posted by Ian Bonnycastle <ib...@gmail.com>.
I just wanted to let you know, Marcelo, and others who may run into this
error in the future... I figured it out!

When I first started to work on my scripts, I was using "sbt/sbt package"
followed by an "sbt/sbt run". But, when I saw "sbt/sbt run" show that it
was compiling the script, I gave up on the "sbt/sbt package", not realizing
it actually put the jar together that was distributed to all the nodes. I
found out that the jar that I was using with my script was actually very
old, and did not have the .class file required by the program, hence the
ClassNotFoundException error.

So, in the future, always "sbt/sbt package" before doing an "sbt/sbt run".

Thank you for all your help, Marcelo.

Ian


On Mon, Apr 14, 2014 at 2:59 PM, Ian Bonnycastle <ib...@gmail.com> wrote:

> Hi Marcelo,
>
> Changing it to null didn't make any difference at all.
> /usr/local/pkg/spark is also on all the nodes... it has to be in order to
> get all the nodes up and running in the cluster. Also, I'm confused by what
> you mean with "That's most probably the class that implements the closure
> you're passing as an argument to the reduceByKey() method". The only thing
> I'm passing to it is "_ + _".. and as you mentioned, its pretty much the
> same as the map() method.
>
> If I run the following code, it runs 100% properly on the cluster:
>
>     val numAs = logData.filter(line => line.contains("a")).count()
>
> So, this is a closure to the filter() method, and it doesn't have any
> problems at all. Also, if I run the reduceByKey in local mode, it runs
> perfectly. So, as you mentioned, it almost sounds like the code, or the
> closure, is not getting to all the nodes properly. But why reduceByKey is
> the only method affected is beyond me.
>
> Ian
>
>
> On Mon, Apr 14, 2014 at 2:45 PM, Marcelo Vanzin <va...@cloudera.com>wrote:
>
>> Hi Ian,
>>
>> On Mon, Apr 14, 2014 at 11:30 AM, Ian Bonnycastle <ib...@gmail.com>
>> wrote:
>> >     val sc = new SparkContext("spark://<masternodeip>:7077",
>> >                               "Simple App", "/usr/local/pkg/spark",
>> >              List("target/scala-2.10/simple-project_2.10-1.0.jar"))
>>
>> Hmmm... does /usr/local/pkg/spark exist on all the worker nodes? (I
>> haven't particularly tried using the sparkHome argument myself, nor
>> have I traced through the code to see exactly what it does, but...).
>> I'd try to set the "sparkHome" argument to null and seeing if that
>> helps. (It has been working for me without it.) Since you're already
>> listing you app's jar file there, you don't need to explicitly call
>> addJar().
>>
>> Note that the class that isn't being found is not a Spark class, it's
>> a class form your app (SimpleApp$$anonfun$3). That's most probably the
>> class that implements the closure you're passing as an argument to the
>> reduceByKey() method. Although I can't really explain why the same
>> isn't happening for the closure you're passing to map()...
>>
>> Sorry I can't be more helpful.
>>
>> > I still get the error, though, with ClassNotFoundException, unless I'm
>> not
>> > understanding how to run the sc.addJar. I find it a little weird, too,
>> that
>> > the Spark platform has trouble finding the code that is itself. And why
>> only
>> > with the reduceByKey function is it occuring? I have no problems with
>> any
>> > other code running except for that. (BTW, I don't use <masternodeip> in
>> my
>> > code above... I just removed it for security purposes.)
>> >
>> > Thanks,
>> >
>> > Ian
>> >
>> >
>> >
>> > On Mon, Apr 14, 2014 at 12:45 PM, Marcelo Vanzin <va...@cloudera.com>
>> > wrote:
>> >>
>> >> Hi Ian,
>> >>
>> >> When you run your packaged application, are you adding its jar file to
>> >> the SparkContext (by calling the addJar() method)?
>> >>
>> >> That will distribute the code to all the worker nodes. The failure
>> >> you're seeing seems to indicate the worker nodes do not have access to
>> >> your code.
>> >>
>> >> On Mon, Apr 14, 2014 at 9:17 AM, Ian Bonnycastle <ib...@gmail.com>
>> wrote:
>> >> > Good afternoon,
>> >> >
>> >> > I'm attempting to get the wordcount example working, and I keep
>> getting
>> >> > an
>> >> > error in the "reduceByKey(_ + _)" call. I've scoured the mailing
>> lists,
>> >> > and
>> >> > haven't been able to find a sure fire solution, unless I'm missing
>> >> > something
>> >> > big. I did find something close, but it didn't appear to work in my
>> >> > case.
>> >> > The error is:
>> >> >
>> >> > org.apache.spark.SparkException: Job aborted: Task 2.0:3 failed 4
>> times
>> >> > (most recent failure: Exception failure:
>> >> > java.lang.ClassNotFoundException:
>> >> > SimpleApp$$anonfun$3)
>> >> >         at
>> >> >
>> >> >
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>> >>
>> >> --
>> >> Marcelo
>> >
>> >
>>
>>
>>
>> --
>> Marcelo
>>
>
>

Re: reduceByKey issue in example wordcount (scala)

Posted by Ian Bonnycastle <ib...@gmail.com>.
Hi Marcelo,

Changing it to null didn't make any difference at all. /usr/local/pkg/spark
is also on all the nodes... it has to be in order to get all the nodes up
and running in the cluster. Also, I'm confused by what you mean with
"That's most probably the class that implements the closure you're passing
as an argument to the reduceByKey() method". The only thing I'm passing to
it is "_ + _".. and as you mentioned, its pretty much the same as the map()
method.

If I run the following code, it runs 100% properly on the cluster:

    val numAs = logData.filter(line => line.contains("a")).count()

So, this is a closure to the filter() method, and it doesn't have any
problems at all. Also, if I run the reduceByKey in local mode, it runs
perfectly. So, as you mentioned, it almost sounds like the code, or the
closure, is not getting to all the nodes properly. But why reduceByKey is
the only method affected is beyond me.

Ian


On Mon, Apr 14, 2014 at 2:45 PM, Marcelo Vanzin <va...@cloudera.com> wrote:

> Hi Ian,
>
> On Mon, Apr 14, 2014 at 11:30 AM, Ian Bonnycastle <ib...@gmail.com>
> wrote:
> >     val sc = new SparkContext("spark://<masternodeip>:7077",
> >                               "Simple App", "/usr/local/pkg/spark",
> >              List("target/scala-2.10/simple-project_2.10-1.0.jar"))
>
> Hmmm... does /usr/local/pkg/spark exist on all the worker nodes? (I
> haven't particularly tried using the sparkHome argument myself, nor
> have I traced through the code to see exactly what it does, but...).
> I'd try to set the "sparkHome" argument to null and seeing if that
> helps. (It has been working for me without it.) Since you're already
> listing you app's jar file there, you don't need to explicitly call
> addJar().
>
> Note that the class that isn't being found is not a Spark class, it's
> a class form your app (SimpleApp$$anonfun$3). That's most probably the
> class that implements the closure you're passing as an argument to the
> reduceByKey() method. Although I can't really explain why the same
> isn't happening for the closure you're passing to map()...
>
> Sorry I can't be more helpful.
>
> > I still get the error, though, with ClassNotFoundException, unless I'm
> not
> > understanding how to run the sc.addJar. I find it a little weird, too,
> that
> > the Spark platform has trouble finding the code that is itself. And why
> only
> > with the reduceByKey function is it occuring? I have no problems with any
> > other code running except for that. (BTW, I don't use <masternodeip> in
> my
> > code above... I just removed it for security purposes.)
> >
> > Thanks,
> >
> > Ian
> >
> >
> >
> > On Mon, Apr 14, 2014 at 12:45 PM, Marcelo Vanzin <va...@cloudera.com>
> > wrote:
> >>
> >> Hi Ian,
> >>
> >> When you run your packaged application, are you adding its jar file to
> >> the SparkContext (by calling the addJar() method)?
> >>
> >> That will distribute the code to all the worker nodes. The failure
> >> you're seeing seems to indicate the worker nodes do not have access to
> >> your code.
> >>
> >> On Mon, Apr 14, 2014 at 9:17 AM, Ian Bonnycastle <ib...@gmail.com>
> wrote:
> >> > Good afternoon,
> >> >
> >> > I'm attempting to get the wordcount example working, and I keep
> getting
> >> > an
> >> > error in the "reduceByKey(_ + _)" call. I've scoured the mailing
> lists,
> >> > and
> >> > haven't been able to find a sure fire solution, unless I'm missing
> >> > something
> >> > big. I did find something close, but it didn't appear to work in my
> >> > case.
> >> > The error is:
> >> >
> >> > org.apache.spark.SparkException: Job aborted: Task 2.0:3 failed 4
> times
> >> > (most recent failure: Exception failure:
> >> > java.lang.ClassNotFoundException:
> >> > SimpleApp$$anonfun$3)
> >> >         at
> >> >
> >> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
> >>
> >> --
> >> Marcelo
> >
> >
>
>
>
> --
> Marcelo
>

Re: reduceByKey issue in example wordcount (scala)

Posted by Marcelo Vanzin <va...@cloudera.com>.
Hi Ian,

On Mon, Apr 14, 2014 at 11:30 AM, Ian Bonnycastle <ib...@gmail.com> wrote:
>     val sc = new SparkContext("spark://<masternodeip>:7077",
>                               "Simple App", "/usr/local/pkg/spark",
>              List("target/scala-2.10/simple-project_2.10-1.0.jar"))

Hmmm... does /usr/local/pkg/spark exist on all the worker nodes? (I
haven't particularly tried using the sparkHome argument myself, nor
have I traced through the code to see exactly what it does, but...).
I'd try to set the "sparkHome" argument to null and seeing if that
helps. (It has been working for me without it.) Since you're already
listing you app's jar file there, you don't need to explicitly call
addJar().

Note that the class that isn't being found is not a Spark class, it's
a class form your app (SimpleApp$$anonfun$3). That's most probably the
class that implements the closure you're passing as an argument to the
reduceByKey() method. Although I can't really explain why the same
isn't happening for the closure you're passing to map()...

Sorry I can't be more helpful.

> I still get the error, though, with ClassNotFoundException, unless I'm not
> understanding how to run the sc.addJar. I find it a little weird, too, that
> the Spark platform has trouble finding the code that is itself. And why only
> with the reduceByKey function is it occuring? I have no problems with any
> other code running except for that. (BTW, I don't use <masternodeip> in my
> code above... I just removed it for security purposes.)
>
> Thanks,
>
> Ian
>
>
>
> On Mon, Apr 14, 2014 at 12:45 PM, Marcelo Vanzin <va...@cloudera.com>
> wrote:
>>
>> Hi Ian,
>>
>> When you run your packaged application, are you adding its jar file to
>> the SparkContext (by calling the addJar() method)?
>>
>> That will distribute the code to all the worker nodes. The failure
>> you're seeing seems to indicate the worker nodes do not have access to
>> your code.
>>
>> On Mon, Apr 14, 2014 at 9:17 AM, Ian Bonnycastle <ib...@gmail.com> wrote:
>> > Good afternoon,
>> >
>> > I'm attempting to get the wordcount example working, and I keep getting
>> > an
>> > error in the "reduceByKey(_ + _)" call. I've scoured the mailing lists,
>> > and
>> > haven't been able to find a sure fire solution, unless I'm missing
>> > something
>> > big. I did find something close, but it didn't appear to work in my
>> > case.
>> > The error is:
>> >
>> > org.apache.spark.SparkException: Job aborted: Task 2.0:3 failed 4 times
>> > (most recent failure: Exception failure:
>> > java.lang.ClassNotFoundException:
>> > SimpleApp$$anonfun$3)
>> >         at
>> >
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>>
>> --
>> Marcelo
>
>



-- 
Marcelo

Re: reduceByKey issue in example wordcount (scala)

Posted by Ian Bonnycastle <ib...@gmail.com>.
Hi Marcelo, thanks for answering.

That didn't seem to help. I have the following now:

    val sc = new SparkContext("spark://<masternodeip>:7077",
                              "Simple App", "/usr/local/pkg/spark",
             List("target/scala-2.10/simple-project_2.10-1.0.jar"))


sc.addJar("/home/spark/workspace/SimpleApp/target/scala-2.10/simple-project_2.10-1.0.jar")

I still get the error, though, with ClassNotFoundException, unless I'm not
understanding how to run the sc.addJar. I find it a little weird, too, that
the Spark platform has trouble finding the code that is itself. And why
only with the reduceByKey function is it occuring? I have no problems with
any other code running except for that. (BTW, I don't use <masternodeip> in
my code above... I just removed it for security purposes.)

Thanks,

Ian



On Mon, Apr 14, 2014 at 12:45 PM, Marcelo Vanzin <va...@cloudera.com>wrote:

> Hi Ian,
>
> When you run your packaged application, are you adding its jar file to
> the SparkContext (by calling the addJar() method)?
>
> That will distribute the code to all the worker nodes. The failure
> you're seeing seems to indicate the worker nodes do not have access to
> your code.
>
> On Mon, Apr 14, 2014 at 9:17 AM, Ian Bonnycastle <ib...@gmail.com> wrote:
> > Good afternoon,
> >
> > I'm attempting to get the wordcount example working, and I keep getting
> an
> > error in the "reduceByKey(_ + _)" call. I've scoured the mailing lists,
> and
> > haven't been able to find a sure fire solution, unless I'm missing
> something
> > big. I did find something close, but it didn't appear to work in my case.
> > The error is:
> >
> > org.apache.spark.SparkException: Job aborted: Task 2.0:3 failed 4 times
> > (most recent failure: Exception failure:
> java.lang.ClassNotFoundException:
> > SimpleApp$$anonfun$3)
> >         at
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>
> --
> Marcelo
>

Re: reduceByKey issue in example wordcount (scala)

Posted by Marcelo Vanzin <va...@cloudera.com>.
Hi Ian,

When you run your packaged application, are you adding its jar file to
the SparkContext (by calling the addJar() method)?

That will distribute the code to all the worker nodes. The failure
you're seeing seems to indicate the worker nodes do not have access to
your code.

On Mon, Apr 14, 2014 at 9:17 AM, Ian Bonnycastle <ib...@gmail.com> wrote:
> Good afternoon,
>
> I'm attempting to get the wordcount example working, and I keep getting an
> error in the "reduceByKey(_ + _)" call. I've scoured the mailing lists, and
> haven't been able to find a sure fire solution, unless I'm missing something
> big. I did find something close, but it didn't appear to work in my case.
> The error is:
>
> org.apache.spark.SparkException: Job aborted: Task 2.0:3 failed 4 times
> (most recent failure: Exception failure: java.lang.ClassNotFoundException:
> SimpleApp$$anonfun$3)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)

-- 
Marcelo