You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Patrick Wendell (JIRA)" <ji...@apache.org> on 2015/04/09 04:20:12 UTC
[jira] [Resolved] (SPARK-6792) pySpark groupByKey returns rows with
the same key
[ https://issues.apache.org/jira/browse/SPARK-6792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Patrick Wendell resolved SPARK-6792.
------------------------------------
Resolution: Not A Problem
Resolving per Josh's comment.
> pySpark groupByKey returns rows with the same key
> -------------------------------------------------
>
> Key: SPARK-6792
> URL: https://issues.apache.org/jira/browse/SPARK-6792
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 1.3.0
> Reporter: Charles Hayden
>
> Under some circumstances, pySpark groupByKey returns two or more rows with the same groupby key.
> It is not reproducible by a short example, but it can be seen in the following program.
> The preservesPartitioning argument is required to see the failure.
> I ran this with cluster_url=local[4], but I think it will also show up with cluster_url=local.
> =====================================================
> {noformat}
> # The RDD.groupByKey sometimes gives two results with the same key value. This is incorrect: all results with a single key need to be grouped together.
> # Report the spark version
> from pyspark import SparkContext
> import StringIO
> import csv
> sc = SparkContext()
> print sc.version
> def loadRecord(line):
> input = StringIO.StringIO(line)
> reader = csv.reader(input, delimiter='\t')
> return reader.next()
> # Read data from movielens dataset
> # This can be obtained from http://files.grouplens.org/datasets/movielens/ml-100k.zip
> inputFile = 'u.data'
> input = sc.textFile(inputFile)
> data = input.map(loadRecord)
> # Trim off unneeded fields
> data = data.map(lambda row: row[0:2])
> print 'Data Sample'
> print data.take(10)
> # Use join to filter the data
> #
> # map bulds left key
> # map builds right key
> # join
> # map throws away the key and gets result
> # pick a couple of users
> j = sc.parallelize([789, 939])
> # key left
> # conversion to str is required to show the error
> keyed_j = j.map(lambda row: (str(row), None))
> # key right
> keyed_rdd = data.map(lambda row: (str(row[0]), row))
> # join
> joined = keyed_rdd.join(keyed_j)
> # throw away key
> # preservesPartitioning is required to show the error
> res = joined.map(lambda row: row[1][0], preservesPartitioning=True)
> #res = joined.map(lambda row: row[1][0]) # no error
> print 'Filtered Sample'
> print res.take(10)
> #print res.count()
> # Do the groupby
> # There should be fewer rows
> keyed_rdd = res.map(lambda row: (row[1], row), preservesPartitioning=True)
> print 'Input Count', keyed_rdd.count()
> grouped_rdd = keyed_rdd.groupByKey()
> print 'Grouped Count', grouped_rdd.count()
> # There are two rows with the same key !
> print 'Group Output Sample'
> print grouped_rdd.filter(lambda row: row[0] == '508').take(10)
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org