You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Shannon Quinn <sq...@gatech.edu> on 2014/11/18 19:58:55 UTC

Iterative transformations over RDD crashes in phantom reduce

Hi all,

This is somewhat related to my previous question ( 
http://apache-spark-user-list.1001560.n3.nabble.com/Iterative-changes-to-RDD-and-broadcast-variables-tt19042.html 
, for additional context) but for all practical purposes this is its own 
issue.

As in my previous question, I'm making iterative changes to an RDD, 
where each iteration depends on the results of the previous one. I've 
stripped down what was previously a loop to just be two sequential edits 
to try and nail down where the problem is. It looks like this:

index = 0
INDEX = sc.broadcast(index)
M = M.flatMap(func1).reduceByKey(func2)
M.foreach(debug_output)
index = 1
INDEX = sc.broadcast(index)
M = M.flatMap(func1)
M.foreach(debug_output)

M is basically a row-indexed matrix, where each index points to a 
dictionary (sparse matrix more or less, with some domain-specific 
modifications). This program crashes on the second-to-last (7th) line; 
the creepy part is that it says the crash happens in "func2" with the 
broadcast variable "INDEX" == 1 (it attempts to access an entry that 
doesn't exist in a dictionary of one of the rows).

How is that even possible? Am I missing something fundamental about how 
Spark works under the hood?

Thanks for your help!

Shannon

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Iterative transformations over RDD crashes in phantom reduce

Posted by Shannon Quinn <sq...@gatech.edu>.
Sorry everyone--turns out an oft-forgotten single line of code was 
required to make this work:

index = 0
INDEX = sc.broadcast(index)
M = M.flatMap(func1).reduceByKey(func2)
M.foreach(debug_output)
*M.cache()*
index = 1
INDEX = sc.broadcast(index)
M = M.flatMap(func1)
M.foreach(debug_output)

Works as expected now, and I understand why it was failing before: Spark 
was trying to recompute the RDD but consequently it was invoked with 
index == 1.

On 11/18/14 2:02 PM, Shannon Quinn wrote:
> To clarify about what, precisely, is impossible: the crash happens 
> with INDEX == 1 in func2, but func2 is only called in the reduceByKey 
> transformation when INDEX == 0. And according to the output of the 
> foreach() in line 4, that reduceByKey(func2) works just fine. How is 
> it then invoked again with INDEX == 1 when there clearly isn't another 
> reduce call at line 7?
>
> On 11/18/14 1:58 PM, Shannon Quinn wrote:
>> Hi all,
>>
>> This is somewhat related to my previous question ( 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Iterative-changes-to-RDD-and-broadcast-variables-tt19042.html 
>> , for additional context) but for all practical purposes this is its 
>> own issue.
>>
>> As in my previous question, I'm making iterative changes to an RDD, 
>> where each iteration depends on the results of the previous one. I've 
>> stripped down what was previously a loop to just be two sequential 
>> edits to try and nail down where the problem is. It looks like this:
>>
>> index = 0
>> INDEX = sc.broadcast(index)
>> M = M.flatMap(func1).reduceByKey(func2)
>> M.foreach(debug_output)
>> index = 1
>> INDEX = sc.broadcast(index)
>> M = M.flatMap(func1)
>> M.foreach(debug_output)
>>
>> M is basically a row-indexed matrix, where each index points to a 
>> dictionary (sparse matrix more or less, with some domain-specific 
>> modifications). This program crashes on the second-to-last (7th) 
>> line; the creepy part is that it says the crash happens in "func2" 
>> with the broadcast variable "INDEX" == 1 (it attempts to access an 
>> entry that doesn't exist in a dictionary of one of the rows).
>>
>> How is that even possible? Am I missing something fundamental about 
>> how Spark works under the hood?
>>
>> Thanks for your help!
>>
>> Shannon
>


Re: Iterative transformations over RDD crashes in phantom reduce

Posted by Shannon Quinn <sq...@gatech.edu>.
To clarify about what, precisely, is impossible: the crash happens with 
INDEX == 1 in func2, but func2 is only called in the reduceByKey 
transformation when INDEX == 0. And according to the output of the 
foreach() in line 4, that reduceByKey(func2) works just fine. How is it 
then invoked again with INDEX == 1 when there clearly isn't another 
reduce call at line 7?

On 11/18/14 1:58 PM, Shannon Quinn wrote:
> Hi all,
>
> This is somewhat related to my previous question ( 
> http://apache-spark-user-list.1001560.n3.nabble.com/Iterative-changes-to-RDD-and-broadcast-variables-tt19042.html 
> , for additional context) but for all practical purposes this is its 
> own issue.
>
> As in my previous question, I'm making iterative changes to an RDD, 
> where each iteration depends on the results of the previous one. I've 
> stripped down what was previously a loop to just be two sequential 
> edits to try and nail down where the problem is. It looks like this:
>
> index = 0
> INDEX = sc.broadcast(index)
> M = M.flatMap(func1).reduceByKey(func2)
> M.foreach(debug_output)
> index = 1
> INDEX = sc.broadcast(index)
> M = M.flatMap(func1)
> M.foreach(debug_output)
>
> M is basically a row-indexed matrix, where each index points to a 
> dictionary (sparse matrix more or less, with some domain-specific 
> modifications). This program crashes on the second-to-last (7th) line; 
> the creepy part is that it says the crash happens in "func2" with the 
> broadcast variable "INDEX" == 1 (it attempts to access an entry that 
> doesn't exist in a dictionary of one of the rows).
>
> How is that even possible? Am I missing something fundamental about 
> how Spark works under the hood?
>
> Thanks for your help!
>
> Shannon


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org