You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Aris <ar...@gmail.com> on 2016/02/22 23:29:34 UTC

Streaming mapWithState API has NullPointerException

Hello Spark community, and especially TD and Spark Streaming folks:

I am using the new Spark 1.6.0 Streaming mapWithState API, in order to
accomplish a streaming joining task with data.

Things work fine on smaller sets of data, but on a single-node large
cluster with JSON strings amounting to 2.5 GB problems start to occur, I
get a NullPointerException. It appears to happen in my code when I call
DataFrame.write.parquet()

I am reliably reproducing this, and it appears to be internal to
mapWithState -- I don't know what else I can do to make progress, any
thoughts?



Here is the stack trace:

16/02/22 22:03:54 ERROR Executor: Exception in task 1.0 in stage 4349.0
> (TID 6386)
> java.lang.NullPointerException
>         at
> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.getByTime(StateMap.scala:117)
>         at
> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.getByTime(StateMap.scala:117)
>         at
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:69)
>         at
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>         at
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)



> 16/02/22 22:03:55 ERROR JobScheduler: Error running job streaming job
> 1456178580000 ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 12
> in stage 4349.0 failed 1 times, most recent failure: Lost task 12.0 in
> stage 4349.0 (TID 6397, localhost): java.lang.NullPointerException
>         at
> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.getByTime(StateMap.scala:117)
>         at
> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.getByTime(StateMap.scala:117)
>         at
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:69)
>         at
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>         at
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
>         at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
>         at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>         at scala.Option.foreach(Option.scala:257)
>         at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
>         at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
>         at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
>         at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>         at
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
>         at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1314)
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>         at org.apache.spark.rdd.RDD.take(RDD.scala:1288)
>         at
> org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1416)
>         at
> org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1416)
>         at
> org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1416)
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>         at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1415)
>         at
> com.company.denormalize.Implicits$DStreamMixologistRawSchema$$anonfun$outputParquet$1.apply(Implicits.scala:67)
>         at
> com.company.denormalize.Implicits$DStreamMixologistRawSchema$$anonfun$outputParquet$1.apply(Implicits.scala:47)
>         at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>         at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>         at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
>         at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>         at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>         at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
>         at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
>         at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>         at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>         at scala.util.Try$.apply(Try.scala:192)
>         at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
>         at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
>         at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>         at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>         at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>

Re: Streaming mapWithState API has NullPointerException

Posted by Tathagata Das <ta...@gmail.com>.
Yes, you should be okay to test your code. :)

On Mon, Feb 22, 2016 at 5:57 PM, Aris <ar...@gmail.com> wrote:

> If I build from git branch origin/branch-1.6 will I be OK to test out my
> code?
>
> Thank you so much TD!
>
> Aris
>
> On Mon, Feb 22, 2016 at 2:48 PM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
>> There were a few bugs that were solved with mapWithState recently. Would
>> be available in 1.6.1 (RC to be cut soon).
>>
>> On Mon, Feb 22, 2016 at 5:29 PM, Aris <ar...@gmail.com> wrote:
>>
>>> Hello Spark community, and especially TD and Spark Streaming folks:
>>>
>>> I am using the new Spark 1.6.0 Streaming mapWithState API, in order to
>>> accomplish a streaming joining task with data.
>>>
>>> Things work fine on smaller sets of data, but on a single-node large
>>> cluster with JSON strings amounting to 2.5 GB problems start to occur, I
>>> get a NullPointerException. It appears to happen in my code when I call
>>> DataFrame.write.parquet()
>>>
>>> I am reliably reproducing this, and it appears to be internal to
>>> mapWithState -- I don't know what else I can do to make progress, any
>>> thoughts?
>>>
>>>
>>>
>>> Here is the stack trace:
>>>
>>> 16/02/22 22:03:54 ERROR Executor: Exception in task 1.0 in stage 4349.0
>>>> (TID 6386)
>>>> java.lang.NullPointerException
>>>>         at
>>>> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.getByTime(StateMap.scala:117)
>>>>         at
>>>> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.getByTime(StateMap.scala:117)
>>>>         at
>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:69)
>>>>         at
>>>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>>         at
>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>>>>         at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>>>         at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>>>         at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>>>         at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>>         at
>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>         at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>>
>>>> 16/02/22 22:03:55 ERROR JobScheduler: Error running job streaming job
>>>> 1456178580000 ms.0
>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>>> 12 in stage 4349.0 failed 1 times, most recent failure: Lost task 12.0 in
>>>> stage 4349.0 (TID 6397, localhost): java.lang.NullPointerException
>>>>         at
>>>> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.getByTime(StateMap.scala:117)
>>>>         at
>>>> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.getByTime(StateMap.scala:117)
>>>>         at
>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:69)
>>>>         at
>>>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>>         at
>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>>>>         at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>>>         at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>>>         at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>>>         at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>>         at
>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>         at java.lang.Thread.run(Thread.java:745)
>>>> Driver stacktrace:
>>>>         at org.apache.spark.scheduler.DAGScheduler.org
>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
>>>>         at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
>>>>         at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
>>>>         at
>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>         at
>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>>>>         at
>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
>>>>         at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>>>>         at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>>>>         at scala.Option.foreach(Option.scala:257)
>>>>         at
>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
>>>>         at
>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
>>>>         at
>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
>>>>         at
>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
>>>>         at
>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>         at
>>>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>>>>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
>>>>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
>>>>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
>>>>         at
>>>> org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1314)
>>>>         at
>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>>>         at
>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>>>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>>>>         at org.apache.spark.rdd.RDD.take(RDD.scala:1288)
>>>>         at
>>>> org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1416)
>>>>         at
>>>> org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1416)
>>>>         at
>>>> org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1416)
>>>>         at
>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>>>         at
>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>>>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>>>>         at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1415)
>>>>         at
>>>> com.company.denormalize.Implicits$DStreamMixologistRawSchema$$anonfun$outputParquet$1.apply(Implicits.scala:67)
>>>>         at
>>>> com.company.denormalize.Implicits$DStreamMixologistRawSchema$$anonfun$outputParquet$1.apply(Implicits.scala:47)
>>>>         at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>>>>         at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>>>>         at
>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
>>>>         at
>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>>>>         at
>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>>>>         at
>>>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
>>>>         at
>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
>>>>         at
>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>>>>         at
>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>>>>         at scala.util.Try$.apply(Try.scala:192)
>>>>         at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
>>>>         at
>>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
>>>>         at
>>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>>>>         at
>>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>>>>         at
>>>> scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>>>>
>>>
>>>
>>
>

Re: Streaming mapWithState API has NullPointerException

Posted by Aris <ar...@gmail.com>.
If I build from git branch origin/branch-1.6 will I be OK to test out my
code?

Thank you so much TD!

Aris

On Mon, Feb 22, 2016 at 2:48 PM, Tathagata Das <ta...@gmail.com>
wrote:

> There were a few bugs that were solved with mapWithState recently. Would
> be available in 1.6.1 (RC to be cut soon).
>
> On Mon, Feb 22, 2016 at 5:29 PM, Aris <ar...@gmail.com> wrote:
>
>> Hello Spark community, and especially TD and Spark Streaming folks:
>>
>> I am using the new Spark 1.6.0 Streaming mapWithState API, in order to
>> accomplish a streaming joining task with data.
>>
>> Things work fine on smaller sets of data, but on a single-node large
>> cluster with JSON strings amounting to 2.5 GB problems start to occur, I
>> get a NullPointerException. It appears to happen in my code when I call
>> DataFrame.write.parquet()
>>
>> I am reliably reproducing this, and it appears to be internal to
>> mapWithState -- I don't know what else I can do to make progress, any
>> thoughts?
>>
>>
>>
>> Here is the stack trace:
>>
>> 16/02/22 22:03:54 ERROR Executor: Exception in task 1.0 in stage 4349.0
>>> (TID 6386)
>>> java.lang.NullPointerException
>>>         at
>>> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.getByTime(StateMap.scala:117)
>>>         at
>>> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.getByTime(StateMap.scala:117)
>>>         at
>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:69)
>>>         at
>>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>         at
>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>>>         at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>>         at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>>         at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>>         at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>         at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>>> 16/02/22 22:03:55 ERROR JobScheduler: Error running job streaming job
>>> 1456178580000 ms.0
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 12 in stage 4349.0 failed 1 times, most recent failure: Lost task 12.0 in
>>> stage 4349.0 (TID 6397, localhost): java.lang.NullPointerException
>>>         at
>>> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.getByTime(StateMap.scala:117)
>>>         at
>>> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.getByTime(StateMap.scala:117)
>>>         at
>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:69)
>>>         at
>>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>         at
>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>>>         at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>>         at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>>         at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>>         at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>         at java.lang.Thread.run(Thread.java:745)
>>> Driver stacktrace:
>>>         at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
>>>         at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
>>>         at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
>>>         at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>         at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>>>         at
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
>>>         at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>>>         at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>>>         at scala.Option.foreach(Option.scala:257)
>>>         at
>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
>>>         at
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
>>>         at
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
>>>         at
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
>>>         at
>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>         at
>>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>>>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
>>>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
>>>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
>>>         at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1314)
>>>         at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>>         at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>>>         at org.apache.spark.rdd.RDD.take(RDD.scala:1288)
>>>         at
>>> org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1416)
>>>         at
>>> org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1416)
>>>         at
>>> org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1416)
>>>         at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>>         at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>>>         at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1415)
>>>         at
>>> com.company.denormalize.Implicits$DStreamMixologistRawSchema$$anonfun$outputParquet$1.apply(Implicits.scala:67)
>>>         at
>>> com.company.denormalize.Implicits$DStreamMixologistRawSchema$$anonfun$outputParquet$1.apply(Implicits.scala:47)
>>>         at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>>>         at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>>>         at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
>>>         at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>>>         at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>>>         at
>>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
>>>         at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
>>>         at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>>>         at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>>>         at scala.util.Try$.apply(Try.scala:192)
>>>         at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
>>>         at
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
>>>         at
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>>>         at
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>>>         at
>>> scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>>>
>>
>>
>

Re: Streaming mapWithState API has NullPointerException

Posted by Tathagata Das <ta...@gmail.com>.
There were a few bugs that were solved with mapWithState recently. Would be
available in 1.6.1 (RC to be cut soon).

On Mon, Feb 22, 2016 at 5:29 PM, Aris <ar...@gmail.com> wrote:

> Hello Spark community, and especially TD and Spark Streaming folks:
>
> I am using the new Spark 1.6.0 Streaming mapWithState API, in order to
> accomplish a streaming joining task with data.
>
> Things work fine on smaller sets of data, but on a single-node large
> cluster with JSON strings amounting to 2.5 GB problems start to occur, I
> get a NullPointerException. It appears to happen in my code when I call
> DataFrame.write.parquet()
>
> I am reliably reproducing this, and it appears to be internal to
> mapWithState -- I don't know what else I can do to make progress, any
> thoughts?
>
>
>
> Here is the stack trace:
>
> 16/02/22 22:03:54 ERROR Executor: Exception in task 1.0 in stage 4349.0
>> (TID 6386)
>> java.lang.NullPointerException
>>         at
>> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.getByTime(StateMap.scala:117)
>>         at
>> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.getByTime(StateMap.scala:117)
>>         at
>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:69)
>>         at
>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>         at
>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>>         at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>         at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>         at java.lang.Thread.run(Thread.java:745)
>
>
>
>> 16/02/22 22:03:55 ERROR JobScheduler: Error running job streaming job
>> 1456178580000 ms.0
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 12 in stage 4349.0 failed 1 times, most recent failure: Lost task 12.0 in
>> stage 4349.0 (TID 6397, localhost): java.lang.NullPointerException
>>         at
>> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.getByTime(StateMap.scala:117)
>>         at
>> org.apache.spark.streaming.util.OpenHashMapBasedStateMap.getByTime(StateMap.scala:117)
>>         at
>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:69)
>>         at
>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>         at
>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>>         at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>         at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>         at java.lang.Thread.run(Thread.java:745)
>> Driver stacktrace:
>>         at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
>>         at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
>>         at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
>>         at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>         at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>>         at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
>>         at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>>         at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>>         at scala.Option.foreach(Option.scala:257)
>>         at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
>>         at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
>>         at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
>>         at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
>>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>         at
>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
>>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
>>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
>>         at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1314)
>>         at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>         at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>>         at org.apache.spark.rdd.RDD.take(RDD.scala:1288)
>>         at
>> org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1416)
>>         at
>> org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1416)
>>         at
>> org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1416)
>>         at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>         at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>>         at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1415)
>>         at
>> com.company.denormalize.Implicits$DStreamMixologistRawSchema$$anonfun$outputParquet$1.apply(Implicits.scala:67)
>>         at
>> com.company.denormalize.Implicits$DStreamMixologistRawSchema$$anonfun$outputParquet$1.apply(Implicits.scala:47)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>>         at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
>>         at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>>         at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>>         at
>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
>>         at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
>>         at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>>         at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>>         at scala.util.Try$.apply(Try.scala:192)
>>         at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
>>         at
>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
>>         at
>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>>         at
>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>>         at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>>
>
>