You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2016/01/03 09:55:39 UTC
[jira] [Resolved] (SPARK-6416) RDD.fold() requires the operator to
be commutative
[ https://issues.apache.org/jira/browse/SPARK-6416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen resolved SPARK-6416.
------------------------------
Resolution: Fixed
Assignee: Sean Owen
Fix Version/s: 1.5.0
Target Version/s: (was: 2.0.0)
Hm! I should have tried that myself. I think that's a good argument that at least it's not inconsistent. The behavior is documented by an earlier change so calling this resolved, retroactively.
> RDD.fold() requires the operator to be commutative
> --------------------------------------------------
>
> Key: SPARK-6416
> URL: https://issues.apache.org/jira/browse/SPARK-6416
> Project: Spark
> Issue Type: Bug
> Components: Documentation, Spark Core
> Affects Versions: 1.4.0
> Reporter: Josh Rosen
> Assignee: Sean Owen
> Priority: Critical
> Fix For: 1.5.0
>
>
> Spark's {{RDD.fold}} operation has some confusing behaviors when a non-commutative reduce function is used.
> Here's an example, which was originally reported on StackOverflow (https://stackoverflow.com/questions/29150202/pyspark-fold-method-output):
> {code}
> sc.parallelize([1,25,8,4,2]).fold(0,lambda a,b:a+1 )
> 8
> {code}
> To understand what's going on here, let's look at the definition of Spark's `fold` operation.
> I'm going to show the Python version of the code, but the Scala version exhibits the exact same behavior (you can also [browse the source on GitHub|https://github.com/apache/spark/blob/8cb23a1f9a3ed08e57865bcb6cc1cc7902881073/python/pyspark/rdd.py#L780]:
> {code}
> def fold(self, zeroValue, op):
> """
> Aggregate the elements of each partition, and then the results for all
> the partitions, using a given associative function and a neutral "zero
> value."
> The function C{op(t1, t2)} is allowed to modify C{t1} and return it
> as its result value to avoid object allocation; however, it should not
> modify C{t2}.
> >>> from operator import add
> >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
> 15
> """
> def func(iterator):
> acc = zeroValue
> for obj in iterator:
> acc = op(obj, acc)
> yield acc
> vals = self.mapPartitions(func).collect()
> return reduce(op, vals, zeroValue)
> {code}
> (For comparison, see the [Scala implementation of `RDD.fold`|https://github.com/apache/spark/blob/8cb23a1f9a3ed08e57865bcb6cc1cc7902881073/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L943]).
> Spark's `fold` operates by first folding each partition and then folding the results. The problem is that an empty partition gets folded down to the zero element, so the final driver-side fold ends up folding one value for _every_ partition rather than one value for each _non-empty_ partition. This means that the result of `fold` is sensitive to the number of partitions:
> {code}
> >>> sc.parallelize([1,25,8,4,2], 100).fold(0,lambda a,b:a+1 )
> 100
> >>> sc.parallelize([1,25,8,4,2], 50).fold(0,lambda a,b:a+1 )
> 50
> >>> sc.parallelize([1,25,8,4,2], 1).fold(0,lambda a,b:a+1 )
> 1
> {code}
> In this last case, what's happening is that the single partition is being folded down to the correct value, then that value is folded with the zero-value at the driver to yield 1.
> I think the underlying problem here is that our fold() operation implicitly requires the operator to be commutative in addition to associative, but this isn't documented anywhere. Due to ordering non-determinism elsewhere in Spark, such as SPARK-5750, I don't think there's an easy way to fix this. Therefore, I think we should update the documentation and examples to clarify this requirement and explain that our fold acts more like a reduce with a default value than the type of ordering-sensitive fold() that users may expect in functional languages.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org