You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by hide <x2...@gmail.com> on 2015/08/14 13:05:22 UTC
Spark job endup with NPE
Hello,
I'm using spark on yarn cluster and using mongo-hadoop-connector to pull
data to spark, doing some job
The job has following stage.
(flatMap -> flatMap -> reduceByKey -> sortByKey)
The data in MongoDB is tweet from twitter.
First, connect to mongodb and make RDD by following
val mongoRDD = sc.newAPIHadoopRDD(mongoConfig,
classOf[com.mongodb.hadoop.MongoInputFormat],classOf[Object],
classOf[BSONObject])
Set "mongo.input.fields" as below
mongoConfig.set("mongo.input.fields", "{\"_id\": 1, \"text\" : 1}")
the data inside of mongoRDD is looks like
(558baf...,
{ "_id" : { "$oid" : "558baf…"} , "text" : “Apache spark is Lightning-fast
cluster …” })
(558baf...,
{ "_id" : { "$oid" : "558baf…"} , "text" : “hello, my …” })
(558baf...,
{ "_id" : { "$oid" : "558baf…"} , "text" : “Hi, aaa …” })
Nex stage, I use flatMap, inside flatMap getting "text" element(tweet) and
dividing them to word.
val wordRDD = mongoRDD.flatMap(arg => {
var str = arg._2.get("text").toString
// using tokenizer to divid tweet to word or just split tweet by
white space
}
After this, wordRDD is looks like
("Apache", "spark", "is", "Lightning-fast", "cluster", "hello", "my",
......)
When I trying to print every element in wordRDD, I get following error. I
know that the tweet involve with newline charactor or space or tab, but what
makes this NPE?
Is this error "Iterator$$anon$13.hasNext" means iterating though RDD and the
next value is null ?
15/08/13 22:15:14 INFO scheduler.TaskSetManager: Starting task 2766.3 in
stage 14.0 (TID 11136, iot-spark02, RACK_LOCAL, 1951 bytes)
15/08/13 22:18:53 WARN scheduler.TaskSetManager: Lost task 2766.3 in stage
14.0 (TID 11136, iot-spark02): java.lang.NullPointerException
at
$line129.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:85)
at
$line129.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:73)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Can I avoid this error by wrapping the word by scala.Option ?
If anybody know why, please help me?
Thanks,
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-endup-with-NPE-tp24264.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org
Re: Spark job endup with NPE
Posted by Akhil Das <ak...@sigmoidanalytics.com>.
You can put a try..catch around all the transformations that you are doing
and catch such exceptions instead of crashing your entire job.
Thanks
Best Regards
On Fri, Aug 14, 2015 at 4:35 PM, hide <x2...@gmail.com> wrote:
> Hello,
>
> I'm using spark on yarn cluster and using mongo-hadoop-connector to pull
> data to spark, doing some job
> The job has following stage.
> (flatMap -> flatMap -> reduceByKey -> sortByKey)
>
> The data in MongoDB is tweet from twitter.
>
> First, connect to mongodb and make RDD by following
>
> val mongoRDD = sc.newAPIHadoopRDD(mongoConfig,
> classOf[com.mongodb.hadoop.MongoInputFormat],classOf[Object],
> classOf[BSONObject])
>
> Set "mongo.input.fields" as below
> mongoConfig.set("mongo.input.fields", "{\"_id\": 1, \"text\" : 1}")
>
> the data inside of mongoRDD is looks like
>
> (558baf...,
> { "_id" : { "$oid" : "558baf…"} , "text" : “Apache spark is Lightning-fast
> cluster …” })
> (558baf...,
> { "_id" : { "$oid" : "558baf…"} , "text" : “hello, my …” })
> (558baf...,
> { "_id" : { "$oid" : "558baf…"} , "text" : “Hi, aaa …” })
>
>
> Nex stage, I use flatMap, inside flatMap getting "text" element(tweet) and
> dividing them to word.
>
> val wordRDD = mongoRDD.flatMap(arg => {
>
> var str = arg._2.get("text").toString
>
> // using tokenizer to divid tweet to word or just split tweet by
> white space
> }
>
> After this, wordRDD is looks like
>
> ("Apache", "spark", "is", "Lightning-fast", "cluster", "hello", "my",
> ......)
>
>
> When I trying to print every element in wordRDD, I get following error. I
> know that the tweet involve with newline charactor or space or tab, but
> what
> makes this NPE?
>
> Is this error "Iterator$$anon$13.hasNext" means iterating though RDD and
> the
> next value is null ?
>
>
> 15/08/13 22:15:14 INFO scheduler.TaskSetManager: Starting task 2766.3 in
> stage 14.0 (TID 11136, iot-spark02, RACK_LOCAL, 1951 bytes)
> 15/08/13 22:18:53 WARN scheduler.TaskSetManager: Lost task 2766.3 in stage
> 14.0 (TID 11136, iot-spark02): java.lang.NullPointerException
> at
>
> $line129.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:85)
> at
>
> $line129.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:73)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at
>
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
> at
>
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
>
> Can I avoid this error by wrapping the word by scala.Option ?
> If anybody know why, please help me?
>
> Thanks,
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-endup-with-NPE-tp24264.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>