You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Josh Rosen (JIRA)" <ji...@apache.org> on 2015/04/15 22:36:59 UTC
[jira] [Resolved] (SPARK-6886) Big closure in PySpark will fail
during shuffle
[ https://issues.apache.org/jira/browse/SPARK-6886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Josh Rosen resolved SPARK-6886.
-------------------------------
Resolution: Fixed
Fix Version/s: 1.4.0
1.3.2
1.2.3
This should be fixed for 1.2.3, 1.3.2, and 1.4. Thanks!
> Big closure in PySpark will fail during shuffle
> -----------------------------------------------
>
> Key: SPARK-6886
> URL: https://issues.apache.org/jira/browse/SPARK-6886
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 1.2.1, 1.3.0, 1.4.0
> Reporter: Davies Liu
> Assignee: Davies Liu
> Priority: Blocker
> Fix For: 1.2.3, 1.3.2, 1.4.0
>
>
> Reported by beifei.zhou <beifei.zhou at ximalaya.com>:
> I am using spark to process bid datasets. However, there is always problem when executing reduceByKey on a large dataset, whereas with a smaller dataset. May I asked you how could I solve this issue?
> The error is always like this:
> {code}
> 15/04/09 11:27:46 ERROR Executor: Exception in task 3.0 in stage 1.0 (TID 5)
> org.apache.spark.api.python.PythonException: Traceback (most recent call last):
> File "/Users/nali/Softwares/spark/python/pyspark/worker.py", line 90, in main
> command = pickleSer.loads(command.value)
> File "/Users/nali/Softwares/spark/python/pyspark/broadcast.py", line 106, in value
> self._value = self.load(self._path)
> File "/Users/nali/Softwares/spark/python/pyspark/broadcast.py", line 87, in load
> with open(path, 'rb', 1 << 20) as f:
> IOError: [Errno 2] No such file or directory: '/private/var/folders/_x/n59vb1b54pl96lvldz2lr_v40000gn/T/spark-37d8ecbc-9ac9-4aa2-be23-12823f4cd1ed/pyspark-1e3d5904-a5b6-4222-a146-91bfdb4a33a7/tmp8XMhgG'
> {code}
> Here I attach my code:
> {code}
> import codecs
> from pyspark import SparkContext, SparkConf
> from operator import add
> import operator
> from pyspark.storagelevel import StorageLevel
> def combine_dict(a,b):
> a.update(b)
> return a
> conf = SparkConf()
> sc = SparkContext(appName = "tag")
> al_tag_dict = sc.textFile('albumtag.txt').map(lambda x: x.split(',')).map(lambda x: {x[0]: x[1:]}).reduce(lambda a, b: combine_dict(a,b))
> result = sc.textFile('uidAlbumscore.txt')\
> .map(lambda x: x.split(','))\
> .filter(lambda x: x[1] in al_tag_dict.keys())\
> .map(lambda x: (x[0], al_tag_dict[x[1]], float(x[2])))\
> .map(lambda x: map(lambda a: ((x[0], a), x[2]), x[1]))\
> .flatMap(lambda x: x)\
> .map(lambda x: (str(x[0][0]), x[1]))\
> .reduceByKey(add)\
> # .map(lambda x: x[0][0]+','+x[0][1]+','+str(x[1])+'\n')\
> # .reduce(add)
> #codecs.open('tag_score.txt','w','utf-8').write(result)
> print result.first()
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org