You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Maciej Szymkiewicz (JIRA)" <ji...@apache.org> on 2016/01/14 20:52:39 UTC

[jira] [Comment Edited] (SPARK-12824) Failure to maintain consistent RDD references in pyspark

    [ https://issues.apache.org/jira/browse/SPARK-12824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15098738#comment-15098738 ] 

Maciej Szymkiewicz edited comment on SPARK-12824 at 1/14/16 7:51 PM:
---------------------------------------------------------------------

??It seems that all the keys in the dictionary are referencing the same object even though in the code they are clearly supposed to be different objects??

It doesn't look like it. Both Python objects and referenced jRDDs seem to be distinct.

 More likely it is an issue with evaluation of the {{key_value}}.

{code}
def execute(k):                                         
    def execute_():
        return  rdd.filter(lambda row: row['color'] == k)
    return execute_

kvs = {k: execute(k)()  for k in key_values}
for k, v in kvs.items():
    print(k, v.collect())

## blue [{'color': 'blue', 'size': 4}]
## green [{'color': 'green', 'size': 9}, {'color': 'green', 'size': 5}, {'color': 'green', 'size': 50}]
## red [{'color': 'red', 'size': 3}, {'color': 'red', 'size': 7}, {'color': 'red', 'size': 8}, {'color': 'red', 'size': 10}]
## purple [{'color': 'purple', 'size': 6}]
{code}


was (Author: zero323):
??It seems that all the keys in the dictionary are referencing the same object even though in the code they are clearly supposed to be different objects??

It doesn't look like it. Both Python objects and referenced jRDDs seem to be distinct.

 More likely it is an issue with evaluation of the {key_value}.

{code}
def execute(k):                                         
    def execute_():
        return  rdd.filter(lambda row: row['color'] == k)
    return execute_

kvs = {k: execute(k)()  for k in key_values}
for k, v in kvs.items():
    print(k, v.collect())

## blue [{'color': 'blue', 'size': 4}]
## green [{'color': 'green', 'size': 9}, {'color': 'green', 'size': 5}, {'color': 'green', 'size': 50}]
## red [{'color': 'red', 'size': 3}, {'color': 'red', 'size': 7}, {'color': 'red', 'size': 8}, {'color': 'red', 'size': 10}]
## purple [{'color': 'purple', 'size': 6}]
{code}

> Failure to maintain consistent RDD references in pyspark
> --------------------------------------------------------
>
>                 Key: SPARK-12824
>                 URL: https://issues.apache.org/jira/browse/SPARK-12824
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 1.5.2
>         Environment: Spark 1.5.2, Python 2.7.10, and IPython 4.0.0.
>            Reporter: Paul Shearer
>
> Below is a simple {{pyspark}} script that tries to split an RDD into a dictionary containing several RDDs. 
> As the *sample run* shows, the script only works if we do a {{collect()}} on the intermediate RDDs as they are created. Of course I would not want to do that in practice, since it doesn't scale.
> What's really strange is, I'm not assigning the intermediate {{collect()}} results to any variable. So the difference in behavior is due solely to a hidden side-effect of the computation triggered by the {{collect()}} call. 
> Spark is supposed to be a very functional framework with minimal side effects. Why is it only possible to get the desired behavior by triggering some mysterious side effect using {{collect()}}? 
> It seems that all the keys in the dictionary are referencing the same object even though in the code they are clearly supposed to be different objects.
> The run below is with Spark 1.5.2, Python 2.7.10, and IPython 4.0.0.
> h3. spark_script.py
> {noformat}
>     from pprint import PrettyPrinter
>     pp = PrettyPrinter(indent=4).pprint
>     logger = sc._jvm.org.apache.log4j
>     logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
>     logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
>     
>     def split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False):
>         d = dict()
>         for key_value in key_values:
>             d[key_value] = rdd.filter(lambda row: row[key_field] == key_value)
>             if collect_in_loop:
>                 d[key_value].collect()
>         return d
>     def print_results(d):
>         for k in d:
>             print k
>             pp(d[k].collect())    
>     
>     rdd = sc.parallelize([
>         {'color':'red','size':3},
>         {'color':'red', 'size':7},
>         {'color':'red', 'size':8},    
>         {'color':'red', 'size':10},
>         {'color':'green', 'size':9},
>         {'color':'green', 'size':5},
>         {'color':'green', 'size':50},    
>         {'color':'blue', 'size':4},
>         {'color':'purple', 'size':6}])
>     key_field = 'color'
>     key_values = ['red', 'green', 'blue', 'purple']
>     
>     print '### run WITH collect in loop: '
>     d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=True)
>     print_results(d)
>     print '### run WITHOUT collect in loop: '
>     d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False)
>     print_results(d)
> {noformat}
> h3. Sample run in IPython shell
> {noformat}
>     In [1]: execfile('spark_script.py')
>     ### run WITH collect in loop: 
>     blue
>     [{   'color': 'blue', 'size': 4}]
>     purple
>     [{   'color': 'purple', 'size': 6}]
>     green
>     [   {   'color': 'green', 'size': 9},
>         {   'color': 'green', 'size': 5},
>         {   'color': 'green', 'size': 50}]
>     red
>     [   {   'color': 'red', 'size': 3},
>         {   'color': 'red', 'size': 7},
>         {   'color': 'red', 'size': 8},
>         {   'color': 'red', 'size': 10}]
>     ### run WITHOUT collect in loop: 
>     blue
>     [{   'color': 'purple', 'size': 6}]
>     purple
>     [{   'color': 'purple', 'size': 6}]
>     green
>     [{   'color': 'purple', 'size': 6}]
>     red
>     [{   'color': 'purple', 'size': 6}]
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org