You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Dmitriy Lyubimov <dl...@gmail.com> on 2014/02/21 04:49:37 UTC

Custom RDD gets HadoopSplit for compute() call

Hello,

I have this weird error coming up that i am really at loss to explain.

I have defined a custom RDD ( reading from a 3rd party store, a really
simple one). In a same code, same test running locally (0.8.0) it
occasionally receives a compute() call with a HadoopSplit split (instead of
its native split). Naturally, it fails on the cast call.

I don't see any fundamental difference between my code and HadoopRDD or
JdbcRDD.

I managed to reproduce this with the stack frame below with local task
scheduler. But i am really at loss to explain what's going on here.




   - "pool-2-thread-3"@8,430 in group "main": RUNNING
   - org$hbl$whale$plan$spark$HblRDD$$toHblPartition():41, HblRDD$
   {org.hbl.whale.plan.spark}
   - compute():19, HblRDD {org.hbl.whale.plan.spark}
   - computeOrReadCheckpoint():237, RDD {org.apache.spark.rdd}
   - getOrCompute():70, CacheManager {org.apache.spark}
   - iterator():224, RDD {org.apache.spark.rdd}
   - compute():32, FilteredRDD {org.apache.spark.rdd}
   - computeOrReadCheckpoint():237, RDD {org.apache.spark.rdd}
   - iterator():226, RDD {org.apache.spark.rdd}
   - compute():29, MappedRDD {org.apache.spark.rdd}
   - computeOrReadCheckpoint():237, RDD {org.apache.spark.rdd}
   - iterator():226, RDD {org.apache.spark.rdd}
   - compute():36, MapPartitionsRDD {org.apache.spark.rdd}
   - computeOrReadCheckpoint():237, RDD {org.apache.spark.rdd}
   - iterator():226, RDD {org.apache.spark.rdd}
   - run():149, ShuffleMapTask {org.apache.spark.scheduler}
   - run():88, ShuffleMapTask {org.apache.spark.scheduler}
   - runTask():198, LocalScheduler {org.apache.spark.scheduler.local}
   - run():68, LocalActor$$anonfun$launchTask$1$$anon$1
   {org.apache.spark.scheduler.local}
   - call():471, Executors$RunnableAdapter {java.util.concurrent}
   - run():262, FutureTask {java.util.concurrent}
   - runWorker():1145, ThreadPoolExecutor {java.util.concurrent}
   - run():615, ThreadPoolExecutor$Worker {java.util.concurrent}
   - run():744, Thread {java.lang}

Thank you in advance for your help.
-Dmitriy

Re: Custom RDD gets HadoopSplit for compute() call

Posted by Dmitriy Lyubimov <dl...@gmail.com>.
ah, no worries. It seems another engineer created multiple spark sessions
which created those weird race conditions.


On Thu, Feb 20, 2014 at 7:57 PM, Dmitriy Lyubimov <dl...@gmail.com> wrote:

> PS the file it is traing to read from seems to be
> ./indata1392953916265:0+434205 which seems to be a sequence file of (Int,
> BytesWritable).
> So it looks like it is some sort of a shuffle spill.
>
> However, why a custom RDD is supposed to support reads from a shuffle
> spill? any ideas are very much appreciated.
> -Dmitriy
>
>
> On Thu, Feb 20, 2014 at 7:49 PM, Dmitriy Lyubimov <dl...@gmail.com>wrote:
>
>> Hello,
>>
>> I have this weird error coming up that i am really at loss to explain.
>>
>> I have defined a custom RDD ( reading from a 3rd party store, a really
>> simple one). In a same code, same test running locally (0.8.0) it
>> occasionally receives a compute() call with a HadoopSplit split (instead of
>> its native split). Naturally, it fails on the cast call.
>>
>> I don't see any fundamental difference between my code and HadoopRDD or
>> JdbcRDD.
>>
>> I managed to reproduce this with the stack frame below with local task
>> scheduler. But i am really at loss to explain what's going on here.
>>
>>
>>
>>
>>    - "pool-2-thread-3"@8,430 in group "main": RUNNING
>>    - org$hbl$whale$plan$spark$HblRDD$$toHblPartition():41, HblRDD$
>>    {org.hbl.whale.plan.spark}
>>    - compute():19, HblRDD {org.hbl.whale.plan.spark}
>>    - computeOrReadCheckpoint():237, RDD {org.apache.spark.rdd}
>>    - getOrCompute():70, CacheManager {org.apache.spark}
>>    - iterator():224, RDD {org.apache.spark.rdd}
>>    - compute():32, FilteredRDD {org.apache.spark.rdd}
>>    - computeOrReadCheckpoint():237, RDD {org.apache.spark.rdd}
>>    - iterator():226, RDD {org.apache.spark.rdd}
>>    - compute():29, MappedRDD {org.apache.spark.rdd}
>>    - computeOrReadCheckpoint():237, RDD {org.apache.spark.rdd}
>>    - iterator():226, RDD {org.apache.spark.rdd}
>>    - compute():36, MapPartitionsRDD {org.apache.spark.rdd}
>>    - computeOrReadCheckpoint():237, RDD {org.apache.spark.rdd}
>>    - iterator():226, RDD {org.apache.spark.rdd}
>>    - run():149, ShuffleMapTask {org.apache.spark.scheduler}
>>    - run():88, ShuffleMapTask {org.apache.spark.scheduler}
>>    - runTask():198, LocalScheduler {org.apache.spark.scheduler.local}
>>    - run():68, LocalActor$$anonfun$launchTask$1$$anon$1
>>    {org.apache.spark.scheduler.local}
>>    - call():471, Executors$RunnableAdapter {java.util.concurrent}
>>    - run():262, FutureTask {java.util.concurrent}
>>    - runWorker():1145, ThreadPoolExecutor {java.util.concurrent}
>>    - run():615, ThreadPoolExecutor$Worker {java.util.concurrent}
>>    - run():744, Thread {java.lang}
>>
>> Thank you in advance for your help.
>> -Dmitriy
>>
>
>

Re: Custom RDD gets HadoopSplit for compute() call

Posted by Dmitriy Lyubimov <dl...@gmail.com>.
PS the file it is traing to read from seems to be
./indata1392953916265:0+434205 which seems to be a sequence file of (Int,
BytesWritable).
So it looks like it is some sort of a shuffle spill.

However, why a custom RDD is supposed to support reads from a shuffle
spill? any ideas are very much appreciated.
-Dmitriy


On Thu, Feb 20, 2014 at 7:49 PM, Dmitriy Lyubimov <dl...@gmail.com> wrote:

> Hello,
>
> I have this weird error coming up that i am really at loss to explain.
>
> I have defined a custom RDD ( reading from a 3rd party store, a really
> simple one). In a same code, same test running locally (0.8.0) it
> occasionally receives a compute() call with a HadoopSplit split (instead of
> its native split). Naturally, it fails on the cast call.
>
> I don't see any fundamental difference between my code and HadoopRDD or
> JdbcRDD.
>
> I managed to reproduce this with the stack frame below with local task
> scheduler. But i am really at loss to explain what's going on here.
>
>
>
>
>    - "pool-2-thread-3"@8,430 in group "main": RUNNING
>    - org$hbl$whale$plan$spark$HblRDD$$toHblPartition():41, HblRDD$
>    {org.hbl.whale.plan.spark}
>    - compute():19, HblRDD {org.hbl.whale.plan.spark}
>    - computeOrReadCheckpoint():237, RDD {org.apache.spark.rdd}
>    - getOrCompute():70, CacheManager {org.apache.spark}
>    - iterator():224, RDD {org.apache.spark.rdd}
>    - compute():32, FilteredRDD {org.apache.spark.rdd}
>    - computeOrReadCheckpoint():237, RDD {org.apache.spark.rdd}
>    - iterator():226, RDD {org.apache.spark.rdd}
>    - compute():29, MappedRDD {org.apache.spark.rdd}
>    - computeOrReadCheckpoint():237, RDD {org.apache.spark.rdd}
>    - iterator():226, RDD {org.apache.spark.rdd}
>    - compute():36, MapPartitionsRDD {org.apache.spark.rdd}
>    - computeOrReadCheckpoint():237, RDD {org.apache.spark.rdd}
>    - iterator():226, RDD {org.apache.spark.rdd}
>    - run():149, ShuffleMapTask {org.apache.spark.scheduler}
>    - run():88, ShuffleMapTask {org.apache.spark.scheduler}
>    - runTask():198, LocalScheduler {org.apache.spark.scheduler.local}
>    - run():68, LocalActor$$anonfun$launchTask$1$$anon$1
>    {org.apache.spark.scheduler.local}
>    - call():471, Executors$RunnableAdapter {java.util.concurrent}
>    - run():262, FutureTask {java.util.concurrent}
>    - runWorker():1145, ThreadPoolExecutor {java.util.concurrent}
>    - run():615, ThreadPoolExecutor$Worker {java.util.concurrent}
>    - run():744, Thread {java.lang}
>
> Thank you in advance for your help.
> -Dmitriy
>