You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Paul Shearer (JIRA)" <ji...@apache.org> on 2016/01/14 13:20:39 UTC
[jira] [Updated] (SPARK-12824) Failure to maintain consistent RDD
references in pyspark
[ https://issues.apache.org/jira/browse/SPARK-12824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Paul Shearer updated SPARK-12824:
---------------------------------
Description:
Below is a simple `pyspark` script that tries to split an RDD into a dictionary containing several RDDs.
As the **sample run** shows, the script only works if we do a `collect()` on the intermediate RDDs as they are created. Of course I would not want to do that in practice, since it doesn't scale.
What's really strange is, I'm not assigning the intermediate `collect()` results to any variable. So the difference in behavior is due solely to a hidden side-effect of the computation triggered by the `collect()` call.
Spark is supposed to be a very functional framework with minimal side effects. Why is it only possible to get the desired behavior by triggering some mysterious side effect using `collect()`?
The run below is with Spark 1.5.2, Python 2.7.10, and IPython 4.0.0.
spark_script.py
```
from pprint import PrettyPrinter
pp = PrettyPrinter(indent=4).pprint
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
def split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False):
d = dict()
for key_value in key_values:
d[key_value] = rdd.filter(lambda row: row[key_field] == key_value)
if collect_in_loop:
d[key_value].collect()
return d
def print_results(d):
for k in d:
print k
pp(d[k].collect())
rdd = sc.parallelize([
{'color':'red','size':3},
{'color':'red', 'size':7},
{'color':'red', 'size':8},
{'color':'red', 'size':10},
{'color':'green', 'size':9},
{'color':'green', 'size':5},
{'color':'green', 'size':50},
{'color':'blue', 'size':4},
{'color':'purple', 'size':6}])
key_field = 'color'
key_values = ['red', 'green', 'blue', 'purple']
print '### run WITH collect in loop: '
d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=True)
print_results(d)
print '### run WITHOUT collect in loop: '
d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False)
print_results(d)
```
Sample run in IPython shell
```
In [1]: execfile('spark_script.py')
### run WITH collect in loop:
blue
[{ 'color': 'blue', 'size': 4}]
purple
[{ 'color': 'purple', 'size': 6}]
green
[ { 'color': 'green', 'size': 9},
{ 'color': 'green', 'size': 5},
{ 'color': 'green', 'size': 50}]
red
[ { 'color': 'red', 'size': 3},
{ 'color': 'red', 'size': 7},
{ 'color': 'red', 'size': 8},
{ 'color': 'red', 'size': 10}]
### run WITHOUT collect in loop:
blue
[{ 'color': 'purple', 'size': 6}]
purple
[{ 'color': 'purple', 'size': 6}]
green
[{ 'color': 'purple', 'size': 6}]
red
[{ 'color': 'purple', 'size': 6}]
```
was:
Below is a simple `pyspark` script that tries to split an RDD into a dictionary containing several RDDs.
As the **sample run** shows, the script only works if we do a `collect()` on the intermediate RDDs as they are created. Of course I would not want to do that in practice, since it doesn't scale.
What's really strange is, I'm not assigning the intermediate `collect()` results to any variable. So the difference in behavior is due solely to a hidden side-effect of the computation triggered by the `collect()` call.
Spark is supposed to be a very functional framework with minimal side effects. Why is it only possible to get the desired behavior by triggering some mysterious side effect using `collect()`?
The run below is with Spark 1.5.2, Python 2.7.10, and IPython 4.0.0.
### spark_script.py
from pprint import PrettyPrinter
pp = PrettyPrinter(indent=4).pprint
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
def split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False):
d = dict()
for key_value in key_values:
d[key_value] = rdd.filter(lambda row: row[key_field] == key_value)
if collect_in_loop:
d[key_value].collect()
return d
def print_results(d):
for k in d:
print k
pp(d[k].collect())
rdd = sc.parallelize([
{'color':'red','size':3},
{'color':'red', 'size':7},
{'color':'red', 'size':8},
{'color':'red', 'size':10},
{'color':'green', 'size':9},
{'color':'green', 'size':5},
{'color':'green', 'size':50},
{'color':'blue', 'size':4},
{'color':'purple', 'size':6}])
key_field = 'color'
key_values = ['red', 'green', 'blue', 'purple']
print '### run WITH collect in loop: '
d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=True)
print_results(d)
print '### run WITHOUT collect in loop: '
d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False)
print_results(d)
### Sample run in IPython shell
In [1]: execfile('spark_script.py')
### run WITH collect in loop:
blue
[{ 'color': 'blue', 'size': 4}]
purple
[{ 'color': 'purple', 'size': 6}]
green
[ { 'color': 'green', 'size': 9},
{ 'color': 'green', 'size': 5},
{ 'color': 'green', 'size': 50}]
red
[ { 'color': 'red', 'size': 3},
{ 'color': 'red', 'size': 7},
{ 'color': 'red', 'size': 8},
{ 'color': 'red', 'size': 10}]
### run WITHOUT collect in loop:
blue
[{ 'color': 'purple', 'size': 6}]
purple
[{ 'color': 'purple', 'size': 6}]
green
[{ 'color': 'purple', 'size': 6}]
red
[{ 'color': 'purple', 'size': 6}]
> Failure to maintain consistent RDD references in pyspark
> --------------------------------------------------------
>
> Key: SPARK-12824
> URL: https://issues.apache.org/jira/browse/SPARK-12824
> Project: Spark
> Issue Type: Bug
> Environment: Spark 1.5.2, Python 2.7.10, and IPython 4.0.0.
> Reporter: Paul Shearer
>
> Below is a simple `pyspark` script that tries to split an RDD into a dictionary containing several RDDs.
> As the **sample run** shows, the script only works if we do a `collect()` on the intermediate RDDs as they are created. Of course I would not want to do that in practice, since it doesn't scale.
> What's really strange is, I'm not assigning the intermediate `collect()` results to any variable. So the difference in behavior is due solely to a hidden side-effect of the computation triggered by the `collect()` call.
> Spark is supposed to be a very functional framework with minimal side effects. Why is it only possible to get the desired behavior by triggering some mysterious side effect using `collect()`?
> The run below is with Spark 1.5.2, Python 2.7.10, and IPython 4.0.0.
> spark_script.py
> ```
> from pprint import PrettyPrinter
> pp = PrettyPrinter(indent=4).pprint
> logger = sc._jvm.org.apache.log4j
> logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
> logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
>
> def split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False):
> d = dict()
> for key_value in key_values:
> d[key_value] = rdd.filter(lambda row: row[key_field] == key_value)
> if collect_in_loop:
> d[key_value].collect()
> return d
> def print_results(d):
> for k in d:
> print k
> pp(d[k].collect())
>
> rdd = sc.parallelize([
> {'color':'red','size':3},
> {'color':'red', 'size':7},
> {'color':'red', 'size':8},
> {'color':'red', 'size':10},
> {'color':'green', 'size':9},
> {'color':'green', 'size':5},
> {'color':'green', 'size':50},
> {'color':'blue', 'size':4},
> {'color':'purple', 'size':6}])
> key_field = 'color'
> key_values = ['red', 'green', 'blue', 'purple']
>
> print '### run WITH collect in loop: '
> d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=True)
> print_results(d)
> print '### run WITHOUT collect in loop: '
> d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False)
> print_results(d)
> ```
> Sample run in IPython shell
> ```
> In [1]: execfile('spark_script.py')
> ### run WITH collect in loop:
> blue
> [{ 'color': 'blue', 'size': 4}]
> purple
> [{ 'color': 'purple', 'size': 6}]
> green
> [ { 'color': 'green', 'size': 9},
> { 'color': 'green', 'size': 5},
> { 'color': 'green', 'size': 50}]
> red
> [ { 'color': 'red', 'size': 3},
> { 'color': 'red', 'size': 7},
> { 'color': 'red', 'size': 8},
> { 'color': 'red', 'size': 10}]
> ### run WITHOUT collect in loop:
> blue
> [{ 'color': 'purple', 'size': 6}]
> purple
> [{ 'color': 'purple', 'size': 6}]
> green
> [{ 'color': 'purple', 'size': 6}]
> red
> [{ 'color': 'purple', 'size': 6}]
> ```
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org