You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Kan Zhang (JIRA)" <ji...@apache.org> on 2014/05/06 06:39:14 UTC
[jira] [Assigned] (SPARK-1687) Support NamedTuples in RDDs
[ https://issues.apache.org/jira/browse/SPARK-1687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kan Zhang reassigned SPARK-1687:
--------------------------------
Assignee: Kan Zhang
> Support NamedTuples in RDDs
> ---------------------------
>
> Key: SPARK-1687
> URL: https://issues.apache.org/jira/browse/SPARK-1687
> Project: Spark
> Issue Type: Improvement
> Components: PySpark
> Affects Versions: 1.0.0
> Environment: Spark version 1.0.0-SNAPSHOT
> Python 2.7.5
> Reporter: Pat McDonough
> Assignee: Kan Zhang
>
> Add Support for NamedTuples in RDDs. Some sample code is below, followed by the current error that comes from it.
> Based on a quick conversation with [~ahirreddy], [Dill|https://github.com/uqfoundation/dill] might be a good solution here.
> {code}
> In [26]: from collections import namedtuple
> ...
> In [33]: Person = namedtuple('Person', 'id firstName lastName')
> In [34]: jon = Person(1, "Jon", "Doe")
> In [35]: jane = Person(2, "Jane", "Doe")
> In [36]: theDoes = sc.parallelize((jon, jane))
> In [37]: theDoes.collect()
> Out[37]:
> [Person(id=1, firstName='Jon', lastName='Doe'),
> Person(id=2, firstName='Jane', lastName='Doe')]
> In [38]: theDoes.count()
> PySpark worker failed with exception:
> PySpark worker failed with exception:
> Traceback (most recent call last):
> File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main
> serializer.dump_stream(func(split_index, iterator), outfile)
> File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
> return func(split, prev_func(split, iterator))
> File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
> return func(split, prev_func(split, iterator))
> File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func
> def func(s, iterator): return f(iterator)
> File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <lambda>
> return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
> File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <genexpr>
> return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
> File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, in load_stream
> yield self._read_with_length(stream)
> File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, in _read_with_length
> return self.loads(obj)
> AttributeError: 'module' object has no attribute 'Person'
> Traceback (most recent call last):
> File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main
> serializer.dump_stream(func(split_index, iterator), outfile)
> File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
> return func(split, prev_func(split, iterator))
> File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
> return func(split, prev_func(split, iterator))
> File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func
> def func(s, iterator): return f(iterator)
> File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <lambda>
> return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
> File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <genexpr>
> return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
> File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, in load_stream
> yield self._read_with_length(stream)
> File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, in _read_with_length
> return self.loads(obj)
> AttributeError: 'module' object has no attribute 'Person'
> 14/04/30 14:43:53 ERROR Executor: Exception in task ID 23
> org.apache.spark.api.python.PythonException: Traceback (most recent call last):
> File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main
> serializer.dump_stream(func(split_index, iterator), outfile)
> File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
> return func(split, prev_func(split, iterator))
> File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
> return func(split, prev_func(split, iterator))
> File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func
> def func(s, iterator): return f(iterator)
> File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <lambda>
> return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
> File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <genexpr>
> return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
> File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, in load_stream
> yield self._read_with_length(stream)
> File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, in _read_with_length
> return self.loads(obj)
> AttributeError: 'module' object has no attribute 'Person'
> at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:190)
> at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:214)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:151)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
> at org.apache.spark.scheduler.Task.run(Task.scala:51)
> at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:210)
> at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:175)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
> 14/04/30 14:43:53 ERROR Executor: Exception in task ID 21
> org.apache.spark.api.python.PythonException: Traceback (most recent call last):
> File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main
> serializer.dump_stream(func(split_index, iterator), outfile)
> File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
> return func(split, prev_func(split, iterator))
> File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
> return func(split, prev_func(split, iterator))
> File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func
> def func(s, iterator): return f(iterator)
> File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <lambda>
> return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
> File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <genexpr>
> return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
> File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, in load_stream
> yield self._read_with_length(stream)
> File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, in _read_with_length
> return self.loads(obj)
> AttributeError: 'module' object has no attribute 'Person'
> at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:190)
> at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:214)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:151)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
> at org.apache.spark.scheduler.Task.run(Task.scala:51)
> at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:210)
> at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:175)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
> 14/04/30 14:43:53 ERROR TaskSetManager: Task 5.0:3 failed 1 times; aborting job
> ---------------------------------------------------------------------------
> Py4JJavaError Traceback (most recent call last)
> <ipython-input-38-8689b264fa46> in <module>()
> ----> 1 theDoes.count()
> /Users/pat/Projects/spark/python/pyspark/rdd.pyc in count(self)
> 706 3
> 707 """
> --> 708 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
> 709
> 710 def stats(self):
> /Users/pat/Projects/spark/python/pyspark/rdd.pyc in sum(self)
> 697 6.0
> 698 """
> --> 699 return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
> 700
> 701 def count(self):
> /Users/pat/Projects/spark/python/pyspark/rdd.pyc in reduce(self, f)
> 617 if acc is not None:
> 618 yield acc
> --> 619 vals = self.mapPartitions(func).collect()
> 620 return reduce(f, vals)
> 621
> /Users/pat/Projects/spark/python/pyspark/rdd.pyc in collect(self)
> 581 """
> 582 with _JavaStackTrace(self.context) as st:
> --> 583 bytesInJava = self._jrdd.collect().iterator()
> 584 return list(self._collect_iterator_through_file(bytesInJava))
> 585
> /Users/pat/Projects/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
> 535 answer = self.gateway_client.send_command(command)
> 536 return_value = get_return_value(answer, self.gateway_client,
> --> 537 self.target_id, self.name)
> 538
> 539 for temp_arg in temp_args:
> /Users/pat/Projects/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
> 298 raise Py4JJavaError(
> 299 'An error occurred while calling {0}{1}{2}.\n'.
> --> 300 format(target_id, '.', name), value)
> 301 else:
> 302 raise Py4JError(
> Py4JJavaError: An error occurred while calling o53.collect.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 5.0:3 failed 1 times, most recent failure: Exception failure in TID 23 on host localhost: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
> File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main
> serializer.dump_stream(func(split_index, iterator), outfile)
> File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
> return func(split, prev_func(split, iterator))
> File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
> return func(split, prev_func(split, iterator))
> File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func
> def func(s, iterator): return f(iterator)
> File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <lambda>
> return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
> File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <genexpr>
> return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
> File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, in load_stream
> yield self._read_with_length(stream)
> File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, in _read_with_length
> return self.loads(obj)
> AttributeError: 'module' object has no attribute 'Person'
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:190)
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:214)
> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:151)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
> org.apache.spark.scheduler.Task.run(Task.scala:51)
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:210)
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:175)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> java.lang.Thread.run(Thread.java:744)
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
> at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
> at scala.Option.foreach(Option.scala:236)
> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}
--
This message was sent by Atlassian JIRA
(v6.2#6252)