You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Patrick Wendell (JIRA)" <ji...@apache.org> on 2015/04/09 04:20:12 UTC

[jira] [Resolved] (SPARK-6792) pySpark groupByKey returns rows with the same key

     [ https://issues.apache.org/jira/browse/SPARK-6792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Patrick Wendell resolved SPARK-6792.
------------------------------------
    Resolution: Not A Problem

Resolving per Josh's comment.

> pySpark groupByKey returns rows with the same key
> -------------------------------------------------
>
>                 Key: SPARK-6792
>                 URL: https://issues.apache.org/jira/browse/SPARK-6792
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 1.3.0
>            Reporter: Charles Hayden
>
> Under some circumstances, pySpark groupByKey returns two or more rows with the same groupby key.
> It is not reproducible by a short example, but it can be seen in the following program.
> The preservesPartitioning argument is required to see the failure.
> I ran this  with cluster_url=local[4], but I think it will also show up with cluster_url=local.
> =====================================================
> {noformat}
>     # The RDD.groupByKey sometimes gives two results with the same   key value.  This is incorrect: all results with a single key need to be grouped together.
>     # Report the spark version
>     from pyspark import SparkContext
>     import StringIO
>     import csv
>     sc = SparkContext()
>     print sc.version
>     def loadRecord(line):
>         input = StringIO.StringIO(line)
>         reader = csv.reader(input, delimiter='\t')
>         return reader.next()
>     # Read data from movielens dataset
>     # This can be obtained from     http://files.grouplens.org/datasets/movielens/ml-100k.zip
>     inputFile = 'u.data'
>     input = sc.textFile(inputFile)
>     data = input.map(loadRecord)
>     # Trim off unneeded fields
>     data = data.map(lambda row: row[0:2])
>     print 'Data Sample'
>     print data.take(10)
>     # Use join to filter the data
>     #
>     # map bulds left key
>     # map builds right key
>     # join
>     # map throws away the key and gets result
>     # pick a couple of users
>     j = sc.parallelize([789, 939])
>     # key left
>     # conversion to str is required to show the error
>     keyed_j = j.map(lambda row: (str(row), None))
>     # key right
>     keyed_rdd = data.map(lambda row: (str(row[0]), row))
>     # join
>     joined = keyed_rdd.join(keyed_j)
>     # throw away key
>     # preservesPartitioning is required to show the error
>     res = joined.map(lambda row: row[1][0], preservesPartitioning=True)
>     #res = joined.map(lambda row: row[1][0])      # no error
>     print 'Filtered Sample'
>     print res.take(10)
>     #print res.count()
>     # Do the groupby
>     # There should be fewer rows
>     keyed_rdd = res.map(lambda row: (row[1], row),     preservesPartitioning=True)
>     print 'Input Count', keyed_rdd.count()
>     grouped_rdd = keyed_rdd.groupByKey()
>     print 'Grouped Count', grouped_rdd.count()
>     # There are two rows with the same key !
>     print 'Group Output Sample'
>     print grouped_rdd.filter(lambda row: row[0] == '508').take(10)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org