You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Nicholas Chammas (JIRA)" <ji...@apache.org> on 2016/01/14 17:20:39 UTC

[jira] [Commented] (SPARK-12824) Failure to maintain consistent RDD references in pyspark

    [ https://issues.apache.org/jira/browse/SPARK-12824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15098336#comment-15098336 ] 

Nicholas Chammas commented on SPARK-12824:
------------------------------------------

I can reproduce this issue. Here's a more concise reproduction:

{code}
from __future__ import print_function

rdd = sc.parallelize([
    {'color':'red','size':3},
    {'color':'red', 'size':7},
    {'color':'red', 'size':8},
    {'color':'red', 'size':10},
    {'color':'green', 'size':9},
    {'color':'green', 'size':5},
    {'color':'green', 'size':50},
    {'color':'blue', 'size':4},
    {'color':'purple', 'size':6}])


colors = ['purple', 'red', 'green', 'blue']

# Defer collect() till print
color_rdds = {
    color: rdd.filter(lambda x: x['color'] == color)
    for color in colors
}
for k, v in color_rdds.items():
    print(k, v.collect())


# collect() upfront
color_rdds = {
    color: rdd.filter(lambda x: x['color'] == color).collect()
    for color in colors
}
for k, v in color_rdds.items():
    print(k, v)
{code}

Output:

{code}
# Defer collect() till print
purple [{'color': 'blue', 'size': 4}]
blue [{'color': 'blue', 'size': 4}]
green [{'color': 'blue', 'size': 4}]
red [{'color': 'blue', 'size': 4}]

---

# collect() upfront
purple [{'color': 'purple', 'size': 6}]
blue [{'color': 'blue', 'size': 4}]
green [{'color': 'green', 'size': 9}, {'color': 'green', 'size': 5}, {'color': 'green', 'size': 50}]
red [{'color': 'red', 'size': 3}, {'color': 'red', 'size': 7}, {'color': 'red', 'size': 8}, {'color': 'red', 'size': 10}]
{code}

Observations:
* The color that gets repeated in the first block of output is always the last color in {{colors}}.
* This happens on Python 2 and 3, and with both {{items()}} and {{iteritems()}}.

This smells like an RDD naming issue, or something related to lazy evaluation. The filtered RDDs that get generated in the first block under {{color_rdds}} don't have names. Then, when they all get {{collect()}}-ed at once, they all evaluate to the last filtered RDD.

cc [~davies] / [~joshrosen]

> Failure to maintain consistent RDD references in pyspark
> --------------------------------------------------------
>
>                 Key: SPARK-12824
>                 URL: https://issues.apache.org/jira/browse/SPARK-12824
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 1.5.2
>         Environment: Spark 1.5.2, Python 2.7.10, and IPython 4.0.0.
>            Reporter: Paul Shearer
>
> Below is a simple {{pyspark}} script that tries to split an RDD into a dictionary containing several RDDs. 
> As the *sample run* shows, the script only works if we do a {{collect()}} on the intermediate RDDs as they are created. Of course I would not want to do that in practice, since it doesn't scale.
> What's really strange is, I'm not assigning the intermediate {{collect()}} results to any variable. So the difference in behavior is due solely to a hidden side-effect of the computation triggered by the {{collect()}} call. 
> Spark is supposed to be a very functional framework with minimal side effects. Why is it only possible to get the desired behavior by triggering some mysterious side effect using {{collect()}}? 
> It seems that all the keys in the dictionary are referencing the same object even though in the code they are clearly supposed to be different objects.
> The run below is with Spark 1.5.2, Python 2.7.10, and IPython 4.0.0.
> h3. spark_script.py
> {noformat}
>     from pprint import PrettyPrinter
>     pp = PrettyPrinter(indent=4).pprint
>     logger = sc._jvm.org.apache.log4j
>     logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
>     logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
>     
>     def split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False):
>         d = dict()
>         for key_value in key_values:
>             d[key_value] = rdd.filter(lambda row: row[key_field] == key_value)
>             if collect_in_loop:
>                 d[key_value].collect()
>         return d
>     def print_results(d):
>         for k in d:
>             print k
>             pp(d[k].collect())    
>     
>     rdd = sc.parallelize([
>         {'color':'red','size':3},
>         {'color':'red', 'size':7},
>         {'color':'red', 'size':8},    
>         {'color':'red', 'size':10},
>         {'color':'green', 'size':9},
>         {'color':'green', 'size':5},
>         {'color':'green', 'size':50},    
>         {'color':'blue', 'size':4},
>         {'color':'purple', 'size':6}])
>     key_field = 'color'
>     key_values = ['red', 'green', 'blue', 'purple']
>     
>     print '### run WITH collect in loop: '
>     d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=True)
>     print_results(d)
>     print '### run WITHOUT collect in loop: '
>     d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False)
>     print_results(d)
> {noformat}
> h3. Sample run in IPython shell
> {noformat}
>     In [1]: execfile('spark_script.py')
>     ### run WITH collect in loop: 
>     blue
>     [{   'color': 'blue', 'size': 4}]
>     purple
>     [{   'color': 'purple', 'size': 6}]
>     green
>     [   {   'color': 'green', 'size': 9},
>         {   'color': 'green', 'size': 5},
>         {   'color': 'green', 'size': 50}]
>     red
>     [   {   'color': 'red', 'size': 3},
>         {   'color': 'red', 'size': 7},
>         {   'color': 'red', 'size': 8},
>         {   'color': 'red', 'size': 10}]
>     ### run WITHOUT collect in loop: 
>     blue
>     [{   'color': 'purple', 'size': 6}]
>     purple
>     [{   'color': 'purple', 'size': 6}]
>     green
>     [{   'color': 'purple', 'size': 6}]
>     red
>     [{   'color': 'purple', 'size': 6}]
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org