You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Kan Zhang (JIRA)" <ji...@apache.org> on 2014/05/06 06:39:14 UTC

[jira] [Assigned] (SPARK-1687) Support NamedTuples in RDDs

     [ https://issues.apache.org/jira/browse/SPARK-1687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Kan Zhang reassigned SPARK-1687:
--------------------------------

    Assignee: Kan Zhang

> Support NamedTuples in RDDs
> ---------------------------
>
>                 Key: SPARK-1687
>                 URL: https://issues.apache.org/jira/browse/SPARK-1687
>             Project: Spark
>          Issue Type: Improvement
>          Components: PySpark
>    Affects Versions: 1.0.0
>         Environment: Spark version 1.0.0-SNAPSHOT
> Python 2.7.5
>            Reporter: Pat McDonough
>            Assignee: Kan Zhang
>
> Add Support for NamedTuples in RDDs. Some sample code is below, followed by the current error that comes from it.
> Based on a quick conversation with [~ahirreddy], [Dill|https://github.com/uqfoundation/dill] might be a good solution here.
> {code}
> In [26]: from collections import namedtuple
> ...
> In [33]: Person = namedtuple('Person', 'id firstName lastName')
> In [34]: jon = Person(1, "Jon", "Doe")
> In [35]: jane = Person(2, "Jane", "Doe")
> In [36]: theDoes = sc.parallelize((jon, jane))
> In [37]: theDoes.collect()
> Out[37]: 
> [Person(id=1, firstName='Jon', lastName='Doe'),
>  Person(id=2, firstName='Jane', lastName='Doe')]
> In [38]: theDoes.count()
> PySpark worker failed with exception:
> PySpark worker failed with exception:
> Traceback (most recent call last):
>   File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func
>     def func(s, iterator): return f(iterator)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <lambda>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <genexpr>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, in load_stream
>     yield self._read_with_length(stream)
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, in _read_with_length
>     return self.loads(obj)
> AttributeError: 'module' object has no attribute 'Person'
> Traceback (most recent call last):
>   File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func
>     def func(s, iterator): return f(iterator)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <lambda>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <genexpr>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, in load_stream
>     yield self._read_with_length(stream)
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, in _read_with_length
>     return self.loads(obj)
> AttributeError: 'module' object has no attribute 'Person'
> 14/04/30 14:43:53 ERROR Executor: Exception in task ID 23
> org.apache.spark.api.python.PythonException: Traceback (most recent call last):
>   File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func
>     def func(s, iterator): return f(iterator)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <lambda>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <genexpr>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, in load_stream
>     yield self._read_with_length(stream)
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, in _read_with_length
>     return self.loads(obj)
> AttributeError: 'module' object has no attribute 'Person'
> 	at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:190)
> 	at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:214)
> 	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:151)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:51)
> 	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:210)
> 	at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:175)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:744)
> 14/04/30 14:43:53 ERROR Executor: Exception in task ID 21
> org.apache.spark.api.python.PythonException: Traceback (most recent call last):
>   File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func
>     def func(s, iterator): return f(iterator)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <lambda>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <genexpr>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, in load_stream
>     yield self._read_with_length(stream)
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, in _read_with_length
>     return self.loads(obj)
> AttributeError: 'module' object has no attribute 'Person'
> 	at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:190)
> 	at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:214)
> 	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:151)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:51)
> 	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:210)
> 	at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:175)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:744)
> 14/04/30 14:43:53 ERROR TaskSetManager: Task 5.0:3 failed 1 times; aborting job
> ---------------------------------------------------------------------------
> Py4JJavaError                             Traceback (most recent call last)
> <ipython-input-38-8689b264fa46> in <module>()
> ----> 1 theDoes.count()
> /Users/pat/Projects/spark/python/pyspark/rdd.pyc in count(self)
>     706         3
>     707         """
> --> 708         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>     709 
>     710     def stats(self):
> /Users/pat/Projects/spark/python/pyspark/rdd.pyc in sum(self)
>     697         6.0
>     698         """
> --> 699         return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
>     700 
>     701     def count(self):
> /Users/pat/Projects/spark/python/pyspark/rdd.pyc in reduce(self, f)
>     617             if acc is not None:
>     618                 yield acc
> --> 619         vals = self.mapPartitions(func).collect()
>     620         return reduce(f, vals)
>     621 
> /Users/pat/Projects/spark/python/pyspark/rdd.pyc in collect(self)
>     581         """
>     582         with _JavaStackTrace(self.context) as st:
> --> 583           bytesInJava = self._jrdd.collect().iterator()
>     584         return list(self._collect_iterator_through_file(bytesInJava))
>     585 
> /Users/pat/Projects/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
>     535         answer = self.gateway_client.send_command(command)
>     536         return_value = get_return_value(answer, self.gateway_client,
> --> 537                 self.target_id, self.name)
>     538 
>     539         for temp_arg in temp_args:
> /Users/pat/Projects/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
>     298                 raise Py4JJavaError(
>     299                     'An error occurred while calling {0}{1}{2}.\n'.
> --> 300                     format(target_id, '.', name), value)
>     301             else:
>     302                 raise Py4JError(
> Py4JJavaError: An error occurred while calling o53.collect.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 5.0:3 failed 1 times, most recent failure: Exception failure in TID 23 on host localhost: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
>   File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
>     return func(split, prev_func(split, iterator))
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func
>     def func(s, iterator): return f(iterator)
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <lambda>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <genexpr>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, in load_stream
>     yield self._read_with_length(stream)
>   File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, in _read_with_length
>     return self.loads(obj)
> AttributeError: 'module' object has no attribute 'Person'
>         org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:190)
>         org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:214)
>         org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:151)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>         org.apache.spark.scheduler.Task.run(Task.scala:51)
>         org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:210)
>         org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
>         org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:175)
>         java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         java.lang.Thread.run(Thread.java:744)
> Driver stacktrace:
> 	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> 	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
> 	at scala.Option.foreach(Option.scala:236)
> 	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> 	at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)