You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Paul Shearer (JIRA)" <ji...@apache.org> on 2016/01/14 13:19:39 UTC

[jira] [Created] (SPARK-12824) Failure to maintain consistent RDD references in pyspark

Paul Shearer created SPARK-12824:
------------------------------------

             Summary: Failure to maintain consistent RDD references in pyspark
                 Key: SPARK-12824
                 URL: https://issues.apache.org/jira/browse/SPARK-12824
             Project: Spark
          Issue Type: Bug
         Environment: Spark 1.5.2, Python 2.7.10, and IPython 4.0.0.

            Reporter: Paul Shearer


Below is a simple `pyspark` script that tries to split an RDD into a dictionary containing several RDDs. 

As the **sample run** shows, the script only works if we do a `collect()` on the intermediate RDDs as they are created. Of course I would not want to do that in practice, since it doesn't scale.

What's really strange is, I'm not assigning the intermediate `collect()` results to any variable. So the difference in behavior is due solely to a hidden side-effect of the computation triggered by the `collect()` call. 

Spark is supposed to be a very functional framework with minimal side effects. Why is it only possible to get the desired behavior by triggering some mysterious side effect using `collect()`?

The run below is with Spark 1.5.2, Python 2.7.10, and IPython 4.0.0.

### spark_script.py

    from pprint import PrettyPrinter
    pp = PrettyPrinter(indent=4).pprint
    logger = sc._jvm.org.apache.log4j
    logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
    logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
    
    def split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False):
        d = dict()
        for key_value in key_values:
            d[key_value] = rdd.filter(lambda row: row[key_field] == key_value)
            if collect_in_loop:
                d[key_value].collect()
        return d
    def print_results(d):
        for k in d:
            print k
            pp(d[k].collect())    
    
    rdd = sc.parallelize([
        {'color':'red','size':3},
        {'color':'red', 'size':7},
        {'color':'red', 'size':8},    
        {'color':'red', 'size':10},
        {'color':'green', 'size':9},
        {'color':'green', 'size':5},
        {'color':'green', 'size':50},    
        {'color':'blue', 'size':4},
        {'color':'purple', 'size':6}])
    key_field = 'color'
    key_values = ['red', 'green', 'blue', 'purple']
    
    print '### run WITH collect in loop: '
    d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=True)
    print_results(d)
    print '### run WITHOUT collect in loop: '
    d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False)
    print_results(d)

### Sample run in IPython shell

    In [1]: execfile('spark_script.py')
    ### run WITH collect in loop: 
    blue
    [{   'color': 'blue', 'size': 4}]
    purple
    [{   'color': 'purple', 'size': 6}]
    green
    [   {   'color': 'green', 'size': 9},
        {   'color': 'green', 'size': 5},
        {   'color': 'green', 'size': 50}]
    red
    [   {   'color': 'red', 'size': 3},
        {   'color': 'red', 'size': 7},
        {   'color': 'red', 'size': 8},
        {   'color': 'red', 'size': 10}]
    ### run WITHOUT collect in loop: 
    blue
    [{   'color': 'purple', 'size': 6}]
    purple
    [{   'color': 'purple', 'size': 6}]
    green
    [{   'color': 'purple', 'size': 6}]
    red
    [{   'color': 'purple', 'size': 6}]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org