You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2015/08/05 18:15:05 UTC

[jira] [Assigned] (SPARK-6551) Incorrect aggregate results if seqOp(...) mutates its first argument

     [ https://issues.apache.org/jira/browse/SPARK-6551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-6551:
-----------------------------------

    Assignee:     (was: Apache Spark)

> Incorrect aggregate results if seqOp(...) mutates its first argument
> --------------------------------------------------------------------
>
>                 Key: SPARK-6551
>                 URL: https://issues.apache.org/jira/browse/SPARK-6551
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 1.3.0
>         Environment: Amazon EMR, AMI version 3.5
>            Reporter: Jarno Seppanen
>
> Python RDD.aggregate method doesn't match its documentation w.r.t. seqOp mutating its first argument.
> * the results are incorrect if seqOp mutates its first argument
> * additionally, the zero value is modified if combOp mutates its first argument (this is slightly surprising, would be nice to document)
> I'm aggregating the RDD into a nontrivial data structure, and it would be wasteful to copy the whole data structure into a new instance in every seqOp, so mutation is an important feature.
> I'm seeing the following behavior:
> {code}
> def inc_mutate(counter, item):
>     counter[0] += 1
>     return counter
> def inc_pure(counter, item):
>     return [counter[0] + 1]
> def merge_mutate(c1, c2):
>     c1[0] += c2[0]
>     return c1
> def merge_pure(c1, c2):
>     return [c1[0] + c2[0]]
> # correct answer, when neither function mutates their arguments
> init = [0]
> sc.parallelize(range(10)).aggregate(init, inc_pure, merge_pure)
> # [10]
> init
> # [0]
> # incorrect answer if seqOp mutates its first argument
> init = [0]
> sc.parallelize(range(10)).aggregate(init, inc_mutate, merge_pure)
> # [20] <- WRONG
> init
> # [0]
> # zero value is modified if combOp mutates its first argument
> init = [0]
> sc.parallelize(range(10)).aggregate(init, inc_pure, merge_mutate)
> # [10]
> init
> # [10]
> # for completeness
> init = [0]
> sc.parallelize(range(10)).aggregate(init, inc_mutate, merge_mutate)
> # [20]
> init
> # [20]
> {code}
> I'm running on an EMR cluster launched with:
> {code}
> aws emr create-cluster --name jarno-spark \
>  --ami-version 3.5 \
>  --instance-type c3.8xlarge \
>  --instance-count 5 \
>  --ec2-attributes KeyName=foo \
>  --applications Name=Ganglia \
>  --log-uri s3://foo/log \
>  --bootstrap-actions Path=s3://support.elasticmapreduce/spark/install-spark,Args=[-g,-x,-l,ERROR]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org