You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by sahanbull <sa...@skimlinks.com> on 2014/12/08 15:38:13 UTC

Error when mapping a schema RDD when converting lists

Hi Guys, 

I used applySchema to store a set of nested dictionaries and lists in a
parquet file. 

http://apache-spark-user-list.1001560.n3.nabble.com/Using-sparkSQL-to-convert-a-collection-of-python-dictionary-of-dictionaries-to-schma-RDD-td20228.html#a20461

It was successful and i could successfully load the data as well.Now im
trying to convert this SchemaRDD to a RDD of dictionaries so that I can run
some reduces on them. 

The schema of my RDD is as follows:
 |-- field1: string (nullable = true)
 |-- field2: integer (nullable = true)
 |-- field3: map (nullable = true)
 |    |-- key: integer
 |    |-- value: integer (valueContainsNull = true)
 |-- field4: map (nullable = true)
 |    |-- key: string
 |    |-- value: integer (valueContainsNull = true)
 |-- field5: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- field6: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- field61: string (nullable = true)
 |    |    |-- field62: string (nullable = true)
 |    |    |-- field63: integer (nullable = true)

And Im using the following mapper to map these fields to a RDD that I can
reduce later. 

def generateRecords(line):
	# input : the row stored in parquet file
	# output : a python dictionary with all the key value pairs
	field1 = line.field1
	summary = {}
	summary['field2'] = line.field2
	summary['field3'] = line.field3
	summary['field4'] = line.field4
	summary['field5'] = line.field5
	summary['field6'] = line.field6		
	return (guid,summary)

profiles = sqc.parquetFile(path)
profileRecords = profiles.map(lambda line: generateRecords(line))

This code works perfectly well when field6 is not mapped. IE when you
comment out the line that maps field6 in generateRecords. the RDD gets
generated perfoectly. Even field 5 gets mapped. The key difference between
field 5 and 6 are, field5 is a list of strings and field 6 is a list of
tupes in the forma (String, String, Int) . But when you try to map field6,
it throws :

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/root/spark/python/pyspark/rdd.py", line 847, in count
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/root/spark/python/pyspark/rdd.py", line 838, in sum
    return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
  File "/root/spark/python/pyspark/rdd.py", line 759, in reduce
    vals = self.mapPartitions(func).collect()
  File "/root/spark/python/pyspark/rdd.py", line 723, in collect
    bytesInJava = self._jrdd.collect().iterator()
  File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 538, in __call__
  File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line
300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o88.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 32
in stage 3.0 failed 4 times, most recent failure: Lost task 32.3 in stage
3.0 (TID 1829, ip-172-31-18-36.ec2.internal):
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
  File "/root/spark/python/pyspark/worker.py", line 79, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/root/spark/python/pyspark/serializers.py", line 196, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/root/spark/python/pyspark/serializers.py", line 128, in dump_stream
    self._write_with_length(obj, stream)
  File "/root/spark/python/pyspark/serializers.py", line 138, in
_write_with_length
    serialized = self.dumps(obj)
  File "/root/spark/python/pyspark/serializers.py", line 356, in dumps
    return cPickle.dumps(obj, 2)
PicklingError: Can't pickle <class 'pyspark.sql.List'>: attribute lookup
pyspark.sql.List failed

Can someone help me to understand what is going wrong here. 

Many thanks
SahanB




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-mapping-a-schema-RDD-when-converting-lists-tp20577.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Error when mapping a schema RDD when converting lists

Posted by Davies Liu <da...@databricks.com>.
This is fixed in 1.2. Also, in 1.2+ you could call row.asDict() to
convert the Row object into dict.

On Mon, Dec 8, 2014 at 6:38 AM, sahanbull <sa...@skimlinks.com> wrote:
> Hi Guys,
>
> I used applySchema to store a set of nested dictionaries and lists in a
> parquet file.
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-sparkSQL-to-convert-a-collection-of-python-dictionary-of-dictionaries-to-schma-RDD-td20228.html#a20461
>
> It was successful and i could successfully load the data as well.Now im
> trying to convert this SchemaRDD to a RDD of dictionaries so that I can run
> some reduces on them.
>
> The schema of my RDD is as follows:
>  |-- field1: string (nullable = true)
>  |-- field2: integer (nullable = true)
>  |-- field3: map (nullable = true)
>  |    |-- key: integer
>  |    |-- value: integer (valueContainsNull = true)
>  |-- field4: map (nullable = true)
>  |    |-- key: string
>  |    |-- value: integer (valueContainsNull = true)
>  |-- field5: array (nullable = true)
>  |    |-- element: string (containsNull = true)
>  |-- field6: array (nullable = true)
>  |    |-- element: struct (containsNull = true)
>  |    |    |-- field61: string (nullable = true)
>  |    |    |-- field62: string (nullable = true)
>  |    |    |-- field63: integer (nullable = true)
>
> And Im using the following mapper to map these fields to a RDD that I can
> reduce later.
>
> def generateRecords(line):
>         # input : the row stored in parquet file
>         # output : a python dictionary with all the key value pairs
>         field1 = line.field1
>         summary = {}
>         summary['field2'] = line.field2
>         summary['field3'] = line.field3
>         summary['field4'] = line.field4
>         summary['field5'] = line.field5
>         summary['field6'] = line.field6
>         return (guid,summary)
>
> profiles = sqc.parquetFile(path)
> profileRecords = profiles.map(lambda line: generateRecords(line))
>
> This code works perfectly well when field6 is not mapped. IE when you
> comment out the line that maps field6 in generateRecords. the RDD gets
> generated perfoectly. Even field 5 gets mapped. The key difference between
> field 5 and 6 are, field5 is a list of strings and field 6 is a list of
> tupes in the forma (String, String, Int) . But when you try to map field6,
> it throws :
>
> Traceback (most recent call last):
>   File "<stdin>", line 1, in <module>
>   File "/root/spark/python/pyspark/rdd.py", line 847, in count
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/root/spark/python/pyspark/rdd.py", line 838, in sum
>     return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
>   File "/root/spark/python/pyspark/rdd.py", line 759, in reduce
>     vals = self.mapPartitions(func).collect()
>   File "/root/spark/python/pyspark/rdd.py", line 723, in collect
>     bytesInJava = self._jrdd.collect().iterator()
>   File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> line 538, in __call__
>   File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line
> 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o88.collect.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 32
> in stage 3.0 failed 4 times, most recent failure: Lost task 32.3 in stage
> 3.0 (TID 1829, ip-172-31-18-36.ec2.internal):
> org.apache.spark.api.python.PythonException: Traceback (most recent call
> last):
>   File "/root/spark/python/pyspark/worker.py", line 79, in main
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/root/spark/python/pyspark/serializers.py", line 196, in dump_stream
>     self.serializer.dump_stream(self._batched(iterator), stream)
>   File "/root/spark/python/pyspark/serializers.py", line 128, in dump_stream
>     self._write_with_length(obj, stream)
>   File "/root/spark/python/pyspark/serializers.py", line 138, in
> _write_with_length
>     serialized = self.dumps(obj)
>   File "/root/spark/python/pyspark/serializers.py", line 356, in dumps
>     return cPickle.dumps(obj, 2)
> PicklingError: Can't pickle <class 'pyspark.sql.List'>: attribute lookup
> pyspark.sql.List failed
>
> Can someone help me to understand what is going wrong here.
>
> Many thanks
> SahanB
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-mapping-a-schema-RDD-when-converting-lists-tp20577.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Error when mapping a schema RDD when converting lists

Posted by sahanbull <sa...@skimlinks.com>.
As a tempary fix, it works when I convert field six to a list manually. That
is: 

def generateRecords(line): 
        # input : the row stored in parquet file 
        # output : a python dictionary with all the key value pairs 
        field1 = line.field1 
        summary = {} 
        summary['field2'] = line.field2 
        summary['field3'] = line.field3 
        summary['field4'] = line.field4 
        summary['field5'] = line.field5 
        *summary['field6'] = list(line.field6)	*
        return (field1,summary) 

profiles = sqc.parquetFile(path) 
profileRecords = profiles.map(lambda line: generateRecords(line)) 

Works!!

But I am not convinced this is the best I could do :) 

Cheers
SahanB



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-mapping-a-schema-RDD-when-converting-lists-tp20577p20579.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org