You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by sahanbull <sa...@skimlinks.com> on 2014/12/08 15:38:13 UTC
Error when mapping a schema RDD when converting lists
Hi Guys,
I used applySchema to store a set of nested dictionaries and lists in a
parquet file.
http://apache-spark-user-list.1001560.n3.nabble.com/Using-sparkSQL-to-convert-a-collection-of-python-dictionary-of-dictionaries-to-schma-RDD-td20228.html#a20461
It was successful and i could successfully load the data as well.Now im
trying to convert this SchemaRDD to a RDD of dictionaries so that I can run
some reduces on them.
The schema of my RDD is as follows:
|-- field1: string (nullable = true)
|-- field2: integer (nullable = true)
|-- field3: map (nullable = true)
| |-- key: integer
| |-- value: integer (valueContainsNull = true)
|-- field4: map (nullable = true)
| |-- key: string
| |-- value: integer (valueContainsNull = true)
|-- field5: array (nullable = true)
| |-- element: string (containsNull = true)
|-- field6: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- field61: string (nullable = true)
| | |-- field62: string (nullable = true)
| | |-- field63: integer (nullable = true)
And Im using the following mapper to map these fields to a RDD that I can
reduce later.
def generateRecords(line):
# input : the row stored in parquet file
# output : a python dictionary with all the key value pairs
field1 = line.field1
summary = {}
summary['field2'] = line.field2
summary['field3'] = line.field3
summary['field4'] = line.field4
summary['field5'] = line.field5
summary['field6'] = line.field6
return (guid,summary)
profiles = sqc.parquetFile(path)
profileRecords = profiles.map(lambda line: generateRecords(line))
This code works perfectly well when field6 is not mapped. IE when you
comment out the line that maps field6 in generateRecords. the RDD gets
generated perfoectly. Even field 5 gets mapped. The key difference between
field 5 and 6 are, field5 is a list of strings and field 6 is a list of
tupes in the forma (String, String, Int) . But when you try to map field6,
it throws :
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/root/spark/python/pyspark/rdd.py", line 847, in count
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/root/spark/python/pyspark/rdd.py", line 838, in sum
return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
File "/root/spark/python/pyspark/rdd.py", line 759, in reduce
vals = self.mapPartitions(func).collect()
File "/root/spark/python/pyspark/rdd.py", line 723, in collect
bytesInJava = self._jrdd.collect().iterator()
File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 538, in __call__
File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line
300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o88.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 32
in stage 3.0 failed 4 times, most recent failure: Lost task 32.3 in stage
3.0 (TID 1829, ip-172-31-18-36.ec2.internal):
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
File "/root/spark/python/pyspark/worker.py", line 79, in main
serializer.dump_stream(func(split_index, iterator), outfile)
File "/root/spark/python/pyspark/serializers.py", line 196, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/root/spark/python/pyspark/serializers.py", line 128, in dump_stream
self._write_with_length(obj, stream)
File "/root/spark/python/pyspark/serializers.py", line 138, in
_write_with_length
serialized = self.dumps(obj)
File "/root/spark/python/pyspark/serializers.py", line 356, in dumps
return cPickle.dumps(obj, 2)
PicklingError: Can't pickle <class 'pyspark.sql.List'>: attribute lookup
pyspark.sql.List failed
Can someone help me to understand what is going wrong here.
Many thanks
SahanB
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-mapping-a-schema-RDD-when-converting-lists-tp20577.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org
Re: Error when mapping a schema RDD when converting lists
Posted by Davies Liu <da...@databricks.com>.
This is fixed in 1.2. Also, in 1.2+ you could call row.asDict() to
convert the Row object into dict.
On Mon, Dec 8, 2014 at 6:38 AM, sahanbull <sa...@skimlinks.com> wrote:
> Hi Guys,
>
> I used applySchema to store a set of nested dictionaries and lists in a
> parquet file.
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-sparkSQL-to-convert-a-collection-of-python-dictionary-of-dictionaries-to-schma-RDD-td20228.html#a20461
>
> It was successful and i could successfully load the data as well.Now im
> trying to convert this SchemaRDD to a RDD of dictionaries so that I can run
> some reduces on them.
>
> The schema of my RDD is as follows:
> |-- field1: string (nullable = true)
> |-- field2: integer (nullable = true)
> |-- field3: map (nullable = true)
> | |-- key: integer
> | |-- value: integer (valueContainsNull = true)
> |-- field4: map (nullable = true)
> | |-- key: string
> | |-- value: integer (valueContainsNull = true)
> |-- field5: array (nullable = true)
> | |-- element: string (containsNull = true)
> |-- field6: array (nullable = true)
> | |-- element: struct (containsNull = true)
> | | |-- field61: string (nullable = true)
> | | |-- field62: string (nullable = true)
> | | |-- field63: integer (nullable = true)
>
> And Im using the following mapper to map these fields to a RDD that I can
> reduce later.
>
> def generateRecords(line):
> # input : the row stored in parquet file
> # output : a python dictionary with all the key value pairs
> field1 = line.field1
> summary = {}
> summary['field2'] = line.field2
> summary['field3'] = line.field3
> summary['field4'] = line.field4
> summary['field5'] = line.field5
> summary['field6'] = line.field6
> return (guid,summary)
>
> profiles = sqc.parquetFile(path)
> profileRecords = profiles.map(lambda line: generateRecords(line))
>
> This code works perfectly well when field6 is not mapped. IE when you
> comment out the line that maps field6 in generateRecords. the RDD gets
> generated perfoectly. Even field 5 gets mapped. The key difference between
> field 5 and 6 are, field5 is a list of strings and field 6 is a list of
> tupes in the forma (String, String, Int) . But when you try to map field6,
> it throws :
>
> Traceback (most recent call last):
> File "<stdin>", line 1, in <module>
> File "/root/spark/python/pyspark/rdd.py", line 847, in count
> return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
> File "/root/spark/python/pyspark/rdd.py", line 838, in sum
> return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
> File "/root/spark/python/pyspark/rdd.py", line 759, in reduce
> vals = self.mapPartitions(func).collect()
> File "/root/spark/python/pyspark/rdd.py", line 723, in collect
> bytesInJava = self._jrdd.collect().iterator()
> File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> line 538, in __call__
> File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line
> 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o88.collect.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 32
> in stage 3.0 failed 4 times, most recent failure: Lost task 32.3 in stage
> 3.0 (TID 1829, ip-172-31-18-36.ec2.internal):
> org.apache.spark.api.python.PythonException: Traceback (most recent call
> last):
> File "/root/spark/python/pyspark/worker.py", line 79, in main
> serializer.dump_stream(func(split_index, iterator), outfile)
> File "/root/spark/python/pyspark/serializers.py", line 196, in dump_stream
> self.serializer.dump_stream(self._batched(iterator), stream)
> File "/root/spark/python/pyspark/serializers.py", line 128, in dump_stream
> self._write_with_length(obj, stream)
> File "/root/spark/python/pyspark/serializers.py", line 138, in
> _write_with_length
> serialized = self.dumps(obj)
> File "/root/spark/python/pyspark/serializers.py", line 356, in dumps
> return cPickle.dumps(obj, 2)
> PicklingError: Can't pickle <class 'pyspark.sql.List'>: attribute lookup
> pyspark.sql.List failed
>
> Can someone help me to understand what is going wrong here.
>
> Many thanks
> SahanB
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-mapping-a-schema-RDD-when-converting-lists-tp20577.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org
Re: Error when mapping a schema RDD when converting lists
Posted by sahanbull <sa...@skimlinks.com>.
As a tempary fix, it works when I convert field six to a list manually. That
is:
def generateRecords(line):
# input : the row stored in parquet file
# output : a python dictionary with all the key value pairs
field1 = line.field1
summary = {}
summary['field2'] = line.field2
summary['field3'] = line.field3
summary['field4'] = line.field4
summary['field5'] = line.field5
*summary['field6'] = list(line.field6) *
return (field1,summary)
profiles = sqc.parquetFile(path)
profileRecords = profiles.map(lambda line: generateRecords(line))
Works!!
But I am not convinced this is the best I could do :)
Cheers
SahanB
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-mapping-a-schema-RDD-when-converting-lists-tp20577p20579.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org