You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Adam Westerman <as...@gmail.com> on 2016/05/06 13:57:51 UTC

getting NullPointerException while doing left outer join

Hi,

I’m attempting to do a left outer join in Spark, and I’m getting an NPE
that appears to be due to some Spark Java API bug. (I’m running Spark 1.6.0
in local mode on a Mac).

For a little background, the left outer join returns all keys from the left
side of the join regardless of whether or not the key is present on the
right side.  To handle this uncertainty, the value from the right side is
wrapped in Guava’s Optional class.  The Optional class has a method to
check whether the value is present or not (which would indicate the key
appeared in both RDDs being joined).  If the key was indeed present in both
RDDs you can then retrieve the value and move forward.

After doing a little digging, I found that Spark is using Scala’s Option
functionality internally.  This is the same concept as the Guava Optional,
only native to Scala.  It appears that during the conversion from a Scala
Option back to a Guava Optional (this method can be found here:
https://github.com/apache/spark/blob/v1.6.0/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala#L28)
the
conversion method is erroneously passed a Scala Option with the String
value “None” instead of Scala’s null value None.  This is matched to the
first *case*, which causes Guava’s Optional.of method to attempt to pull
the value out.  A NPE is thrown since it wasn’t ever actually there.

The code basically looks like this, where the classes used are just plain
Java objects with some class attributes inside:
// First RDD
JavaPairRDD<GroupItemNode, WeekItemComposite> rdd1
// Second RDD
JavaPairRDD<GroupItemNode, Inventory> rdd2

// Resultant RDD
JavaPairRDD<GroupItemNode, Tuple2<WeekItemComposite, Optional<Inventory>>>
result = rdd1.leftOuterJoin(rdd2)

Has anyone ever encountered this problem before, or know why the
optionToOptional method might be getting passed this “None” value?  I’ve
added some more relevant information below, let me know if I can provide
any more details.

Here's a screenshot showing the string value of “None” being passed into
the optionToOptional method using the debugger:

Here’s the stack trace (the method shown above is highlighted):

ERROR 13:17:00,743 com.tgt.allocation.needengine.NeedEngineApplication
Exception while running need engine:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 8
in stage 31.0 failed 1 times, most recent failure: Lost task 8.0 in stage
31.0 (TID 50, localhost): java.lang.NullPointerException
at
org.spark-project.guava.base.Preconditions.checkNotNull(Preconditions.java:191)
at com.google.common.base.Optional.of(Optional.java:86)
at org.apache.spark.api.java.JavaUtils$.optionToOptional(JavaUtils.scala:30)
at
org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
at
org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
<http://org.apache.spark.scheduler.dagscheduler.org/>
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
at org.apache.spark.rdd.RDD.count(RDD.scala:1143)
at org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:440)
at org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:46)
at
com.tgt.allocation.needengine.spark.processor.NeedEngineProcessor.runProcessor(NeedEngineProcessor.java:43)
at
com.tgt.allocation.needengine.spark.processor.SparkProcessor.runProcessor(SparkProcessor.java:68)
at
com.tgt.allocation.needengine.service.NeedEngineService.runProcessor(NeedEngineService.java:47)
at
com.tgt.allocation.needengine.NeedEngineApplication.main(NeedEngineApplication.java:29)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.lang.NullPointerException
at
org.spark-project.guava.base.Preconditions.checkNotNull(Preconditions.java:191)
at com.google.common.base.Optional.of(Optional.java:86)
at org.apache.spark.api.java.JavaUtils$.optionToOptional(JavaUtils.scala:30)
at
org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
at
org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
 WARN 13:17:00,744 org.apache.spark.Logging$class Lost task 9.0 in stage
31.0 (TID 51, localhost): TaskKilled (killed intentionally)

Thank you for any help you may be able to provide,

Adam Westerman

Re: getting NullPointerException while doing left outer join

Posted by Adam Westerman <as...@gmail.com>.
For anyone interested, the problem ended up being that in some rare cases,
the value from the pair RDD on the right side of the left outer join was
Java's null.  The Spark optionToOptional method attempted to apply Some()
to null, which caused the NPE to be thrown.

The lesson is to filter out any null values before doing an outer join.

-Adam

On Fri, May 6, 2016 at 10:45 AM, Adam Westerman <as...@gmail.com> wrote:

> Hi Ted,
>
> I am working on replicating the problem on a smaller scale.
>
> I saw that Spark 2.0 is moving to Java 8 Optional instead of Guava
> Optional, but in the meantime I'm stuck with 1.6.1.
>
> -Adam
>
> On Fri, May 6, 2016 at 9:40 AM, Ted Yu <yu...@gmail.com> wrote:
>
>> Is it possible to write a short test which exhibits this problem ?
>>
>> For Spark 2.0, this part of code has changed:
>>
>> [SPARK-4819] Remove Guava's "Optional" from public API
>>
>> FYI
>>
>> On Fri, May 6, 2016 at 6:57 AM, Adam Westerman <as...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I’m attempting to do a left outer join in Spark, and I’m getting an NPE
>>> that appears to be due to some Spark Java API bug. (I’m running Spark 1.6.0
>>> in local mode on a Mac).
>>>
>>> For a little background, the left outer join returns all keys from the
>>> left side of the join regardless of whether or not the key is present on
>>> the right side.  To handle this uncertainty, the value from the right side
>>> is wrapped in Guava’s Optional class.  The Optional class has a method to
>>> check whether the value is present or not (which would indicate the key
>>> appeared in both RDDs being joined).  If the key was indeed present in both
>>> RDDs you can then retrieve the value and move forward.
>>>
>>> After doing a little digging, I found that Spark is using Scala’s Option
>>> functionality internally.  This is the same concept as the Guava Optional,
>>> only native to Scala.  It appears that during the conversion from a Scala
>>> Option back to a Guava Optional (this method can be found here:
>>> https://github.com/apache/spark/blob/v1.6.0/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala#L28) the
>>> conversion method is erroneously passed a Scala Option with the String
>>> value “None” instead of Scala’s null value None.  This is matched to the
>>> first *case*, which causes Guava’s Optional.of method to attempt to
>>> pull the value out.  A NPE is thrown since it wasn’t ever actually there.
>>>
>>> The code basically looks like this, where the classes used are just
>>> plain Java objects with some class attributes inside:
>>> // First RDD
>>> JavaPairRDD<GroupItemNode, WeekItemComposite> rdd1
>>> // Second RDD
>>> JavaPairRDD<GroupItemNode, Inventory> rdd2
>>>
>>> // Resultant RDD
>>> JavaPairRDD<GroupItemNode, Tuple2<WeekItemComposite,
>>> Optional<Inventory>>> result = rdd1.leftOuterJoin(rdd2)
>>>
>>> Has anyone ever encountered this problem before, or know why the
>>> optionToOptional method might be getting passed this “None” value?  I’ve
>>> added some more relevant information below, let me know if I can provide
>>> any more details.
>>>
>>> Here's a screenshot showing the string value of “None” being passed into
>>> the optionToOptional method using the debugger:
>>>
>>> Here’s the stack trace (the method shown above is highlighted):
>>>
>>> ERROR 13:17:00,743 com.tgt.allocation.needengine.NeedEngineApplication
>>> Exception while running need engine:
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 8 in stage 31.0 failed 1 times, most recent failure: Lost task 8.0 in stage
>>> 31.0 (TID 50, localhost): java.lang.NullPointerException
>>> at
>>> org.spark-project.guava.base.Preconditions.checkNotNull(Preconditions.java:191)
>>> at com.google.common.base.Optional.of(Optional.java:86)
>>> at org.apache.spark.api.java.JavaUtils$.optionToOptional
>>> (JavaUtils.scala:30)
>>> at
>>> org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
>>> at
>>> org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
>>> at
>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755)
>>> at
>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755)
>>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>> at
>>> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> Driver stacktrace:
>>> at org.apache.spark.scheduler.DAGScheduler.org
>>> <http://org.apache.spark.scheduler.dagscheduler.org/>
>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>>> at scala.Option.foreach(Option.scala:236)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
>>> at
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
>>> at
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
>>> at
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
>>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
>>> at org.apache.spark.rdd.RDD.count(RDD.scala:1143)
>>> at
>>> org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:440)
>>> at
>>> org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:46)
>>> at
>>> com.tgt.allocation.needengine.spark.processor.NeedEngineProcessor.runProcessor(NeedEngineProcessor.java:43)
>>> at
>>> com.tgt.allocation.needengine.spark.processor.SparkProcessor.runProcessor(SparkProcessor.java:68)
>>> at
>>> com.tgt.allocation.needengine.service.NeedEngineService.runProcessor(NeedEngineService.java:47)
>>> at
>>> com.tgt.allocation.needengine.NeedEngineApplication.main(NeedEngineApplication.java:29)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>>> Caused by: java.lang.NullPointerException
>>> at
>>> org.spark-project.guava.base.Preconditions.checkNotNull(Preconditions.java:191)
>>> at com.google.common.base.Optional.of(Optional.java:86)
>>> at org.apache.spark.api.java.JavaUtils$.optionToOptional
>>> (JavaUtils.scala:30)
>>> at
>>> org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
>>> at
>>> org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
>>> at
>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755)
>>> at
>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755)
>>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>> at
>>> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>>  WARN 13:17:00,744 org.apache.spark.Logging$class Lost task 9.0 in stage
>>> 31.0 (TID 51, localhost): TaskKilled (killed intentionally)
>>>
>>> Thank you for any help you may be able to provide,
>>>
>>> Adam Westerman
>>>
>>
>>
>

Re: getting NullPointerException while doing left outer join

Posted by Adam Westerman <as...@gmail.com>.
Hi Ted,

I am working on replicating the problem on a smaller scale.

I saw that Spark 2.0 is moving to Java 8 Optional instead of Guava
Optional, but in the meantime I'm stuck with 1.6.1.

-Adam

On Fri, May 6, 2016 at 9:40 AM, Ted Yu <yu...@gmail.com> wrote:

> Is it possible to write a short test which exhibits this problem ?
>
> For Spark 2.0, this part of code has changed:
>
> [SPARK-4819] Remove Guava's "Optional" from public API
>
> FYI
>
> On Fri, May 6, 2016 at 6:57 AM, Adam Westerman <as...@gmail.com> wrote:
>
>> Hi,
>>
>> I’m attempting to do a left outer join in Spark, and I’m getting an NPE
>> that appears to be due to some Spark Java API bug. (I’m running Spark 1.6.0
>> in local mode on a Mac).
>>
>> For a little background, the left outer join returns all keys from the
>> left side of the join regardless of whether or not the key is present on
>> the right side.  To handle this uncertainty, the value from the right side
>> is wrapped in Guava’s Optional class.  The Optional class has a method to
>> check whether the value is present or not (which would indicate the key
>> appeared in both RDDs being joined).  If the key was indeed present in both
>> RDDs you can then retrieve the value and move forward.
>>
>> After doing a little digging, I found that Spark is using Scala’s Option
>> functionality internally.  This is the same concept as the Guava Optional,
>> only native to Scala.  It appears that during the conversion from a Scala
>> Option back to a Guava Optional (this method can be found here:
>> https://github.com/apache/spark/blob/v1.6.0/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala#L28) the
>> conversion method is erroneously passed a Scala Option with the String
>> value “None” instead of Scala’s null value None.  This is matched to the
>> first *case*, which causes Guava’s Optional.of method to attempt to pull
>> the value out.  A NPE is thrown since it wasn’t ever actually there.
>>
>> The code basically looks like this, where the classes used are just plain
>> Java objects with some class attributes inside:
>> // First RDD
>> JavaPairRDD<GroupItemNode, WeekItemComposite> rdd1
>> // Second RDD
>> JavaPairRDD<GroupItemNode, Inventory> rdd2
>>
>> // Resultant RDD
>> JavaPairRDD<GroupItemNode, Tuple2<WeekItemComposite,
>> Optional<Inventory>>> result = rdd1.leftOuterJoin(rdd2)
>>
>> Has anyone ever encountered this problem before, or know why the
>> optionToOptional method might be getting passed this “None” value?  I’ve
>> added some more relevant information below, let me know if I can provide
>> any more details.
>>
>> Here's a screenshot showing the string value of “None” being passed into
>> the optionToOptional method using the debugger:
>>
>> Here’s the stack trace (the method shown above is highlighted):
>>
>> ERROR 13:17:00,743 com.tgt.allocation.needengine.NeedEngineApplication
>> Exception while running need engine:
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 8
>> in stage 31.0 failed 1 times, most recent failure: Lost task 8.0 in stage
>> 31.0 (TID 50, localhost): java.lang.NullPointerException
>> at
>> org.spark-project.guava.base.Preconditions.checkNotNull(Preconditions.java:191)
>> at com.google.common.base.Optional.of(Optional.java:86)
>> at org.apache.spark.api.java.JavaUtils$.optionToOptional
>> (JavaUtils.scala:30)
>> at
>> org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
>> at
>> org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
>> at
>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755)
>> at
>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>> at
>> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org
>> <http://org.apache.spark.scheduler.dagscheduler.org/>
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>> at scala.Option.foreach(Option.scala:236)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
>> at org.apache.spark.rdd.RDD.count(RDD.scala:1143)
>> at
>> org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:440)
>> at
>> org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:46)
>> at
>> com.tgt.allocation.needengine.spark.processor.NeedEngineProcessor.runProcessor(NeedEngineProcessor.java:43)
>> at
>> com.tgt.allocation.needengine.spark.processor.SparkProcessor.runProcessor(SparkProcessor.java:68)
>> at
>> com.tgt.allocation.needengine.service.NeedEngineService.runProcessor(NeedEngineService.java:47)
>> at
>> com.tgt.allocation.needengine.NeedEngineApplication.main(NeedEngineApplication.java:29)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>> Caused by: java.lang.NullPointerException
>> at
>> org.spark-project.guava.base.Preconditions.checkNotNull(Preconditions.java:191)
>> at com.google.common.base.Optional.of(Optional.java:86)
>> at org.apache.spark.api.java.JavaUtils$.optionToOptional
>> (JavaUtils.scala:30)
>> at
>> org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
>> at
>> org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
>> at
>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755)
>> at
>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>> at
>> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>  WARN 13:17:00,744 org.apache.spark.Logging$class Lost task 9.0 in stage
>> 31.0 (TID 51, localhost): TaskKilled (killed intentionally)
>>
>> Thank you for any help you may be able to provide,
>>
>> Adam Westerman
>>
>
>

Re: getting NullPointerException while doing left outer join

Posted by Ted Yu <yu...@gmail.com>.
Is it possible to write a short test which exhibits this problem ?

For Spark 2.0, this part of code has changed:

[SPARK-4819] Remove Guava's "Optional" from public API

FYI

On Fri, May 6, 2016 at 6:57 AM, Adam Westerman <as...@gmail.com> wrote:

> Hi,
>
> I’m attempting to do a left outer join in Spark, and I’m getting an NPE
> that appears to be due to some Spark Java API bug. (I’m running Spark 1.6.0
> in local mode on a Mac).
>
> For a little background, the left outer join returns all keys from the
> left side of the join regardless of whether or not the key is present on
> the right side.  To handle this uncertainty, the value from the right side
> is wrapped in Guava’s Optional class.  The Optional class has a method to
> check whether the value is present or not (which would indicate the key
> appeared in both RDDs being joined).  If the key was indeed present in both
> RDDs you can then retrieve the value and move forward.
>
> After doing a little digging, I found that Spark is using Scala’s Option
> functionality internally.  This is the same concept as the Guava Optional,
> only native to Scala.  It appears that during the conversion from a Scala
> Option back to a Guava Optional (this method can be found here:
> https://github.com/apache/spark/blob/v1.6.0/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala#L28) the
> conversion method is erroneously passed a Scala Option with the String
> value “None” instead of Scala’s null value None.  This is matched to the
> first *case*, which causes Guava’s Optional.of method to attempt to pull
> the value out.  A NPE is thrown since it wasn’t ever actually there.
>
> The code basically looks like this, where the classes used are just plain
> Java objects with some class attributes inside:
> // First RDD
> JavaPairRDD<GroupItemNode, WeekItemComposite> rdd1
> // Second RDD
> JavaPairRDD<GroupItemNode, Inventory> rdd2
>
> // Resultant RDD
> JavaPairRDD<GroupItemNode, Tuple2<WeekItemComposite, Optional<Inventory>>>
> result = rdd1.leftOuterJoin(rdd2)
>
> Has anyone ever encountered this problem before, or know why the
> optionToOptional method might be getting passed this “None” value?  I’ve
> added some more relevant information below, let me know if I can provide
> any more details.
>
> Here's a screenshot showing the string value of “None” being passed into
> the optionToOptional method using the debugger:
>
> Here’s the stack trace (the method shown above is highlighted):
>
> ERROR 13:17:00,743 com.tgt.allocation.needengine.NeedEngineApplication
> Exception while running need engine:
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 8
> in stage 31.0 failed 1 times, most recent failure: Lost task 8.0 in stage
> 31.0 (TID 50, localhost): java.lang.NullPointerException
> at
> org.spark-project.guava.base.Preconditions.checkNotNull(Preconditions.java:191)
> at com.google.common.base.Optional.of(Optional.java:86)
> at org.apache.spark.api.java.JavaUtils$.optionToOptional
> (JavaUtils.scala:30)
> at
> org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
> at
> org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> <http://org.apache.spark.scheduler.dagscheduler.org/>
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
> at org.apache.spark.rdd.RDD.count(RDD.scala:1143)
> at org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:440)
> at
> org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:46)
> at
> com.tgt.allocation.needengine.spark.processor.NeedEngineProcessor.runProcessor(NeedEngineProcessor.java:43)
> at
> com.tgt.allocation.needengine.spark.processor.SparkProcessor.runProcessor(SparkProcessor.java:68)
> at
> com.tgt.allocation.needengine.service.NeedEngineService.runProcessor(NeedEngineService.java:47)
> at
> com.tgt.allocation.needengine.NeedEngineApplication.main(NeedEngineApplication.java:29)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> Caused by: java.lang.NullPointerException
> at
> org.spark-project.guava.base.Preconditions.checkNotNull(Preconditions.java:191)
> at com.google.common.base.Optional.of(Optional.java:86)
> at org.apache.spark.api.java.JavaUtils$.optionToOptional
> (JavaUtils.scala:30)
> at
> org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
> at
> org.apache.spark.api.java.JavaPairRDD$$anonfun$leftOuterJoin$2.apply(JavaPairRDD.scala:564)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:755)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>  WARN 13:17:00,744 org.apache.spark.Logging$class Lost task 9.0 in stage
> 31.0 (TID 51, localhost): TaskKilled (killed intentionally)
>
> Thank you for any help you may be able to provide,
>
> Adam Westerman
>