You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2016/07/08 18:34:10 UTC

[jira] [Commented] (SPARK-16449) unionAll raises "Task not serializable"

    [ https://issues.apache.org/jira/browse/SPARK-16449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15368143#comment-15368143 ] 

Sean Owen commented on SPARK-16449:
-----------------------------------

Interesting. Looks like something is serializing a Scala Iterator and it doesn't work. 

The relevant subset is below. Hm, maybe something in LocalTableScan can be rejiggered to avoid this.

{code}
Caused by: java.io.NotSerializableException: scala.collection.Iterator$$anon$11
Serialization stack:
	- object not serializable (class: scala.collection.Iterator$$anon$11, value: empty iterator)
	- field (class: scala.collection.Iterator$$anonfun$toStream$1, name: $outer, type: interface scala.collection.Iterator)
	- object (class scala.collection.Iterator$$anonfun$toStream$1, <function0>)
	- field (class: scala.collection.immutable.Stream$Cons, name: tl, type: interface scala.Function0)
	- object (class scala.collection.immutable.Stream$Cons, Stream(WrappedArray(3526154, 3526154, 1580402, 3526154, 3526154), WrappedArray(5.50388599500189E11, 4.178168090221903, 234846.780654818, 5.134865351881966, 354.7084951479714), WrappedArray(2.596112361975223E11, 0.34382335723646484, 118170.68592261613, 3.3833930336063456, 4.011812510792076), WrappedArray(100002091588, 2.75, 0.85, -1, 292), WrappedArray(999995696635, 6.125, 1193544.39, 34, 480)))
	- field (class: scala.collection.immutable.Stream$$anonfun$zip$1, name: $outer, type: class scala.collection.immutable.Stream)
	- object (class scala.collection.immutable.Stream$$anonfun$zip$1, <function0>)
	- field (class: scala.collection.immutable.Stream$Cons, name: tl, type: interface scala.Function0)
	- object (class scala.collection.immutable.Stream$Cons, Stream((WrappedArray(3526154, 3526154, 1580402, 3526154, 3526154),(count,<function1>)), (WrappedArray(5.50388599500189E11, 4.178168090221903, 234846.780654818, 5.134865351881966, 354.7084951479714),(mean,<function1>)), (WrappedArray(2.596112361975223E11, 0.34382335723646484, 118170.68592261613, 3.3833930336063456, 4.011812510792076),(stddev,<function1>)), (WrappedArray(100002091588, 2.75, 0.85, -1, 292),(min,<function1>)), (WrappedArray(999995696635, 6.125, 1193544.39, 34, 480),(max,<function1>))))
	- field (class: scala.collection.immutable.Stream$$anonfun$map$1, name: $outer, type: class scala.collection.immutable.Stream)
	- object (class scala.collection.immutable.Stream$$anonfun$map$1, <function0>)
	- field (class: scala.collection.immutable.Stream$Cons, name: tl, type: interface scala.Function0)
	- object (class scala.collection.immutable.Stream$Cons, Stream([count,3526154,3526154,1580402,3526154,3526154], [mean,5.50388599500189E11,4.178168090221903,234846.780654818,5.134865351881966,354.7084951479714], [stddev,2.596112361975223E11,0.34382335723646484,118170.68592261613,3.3833930336063456,4.011812510792076], [min,100002091588,2.75,0.85,-1,292], [max,999995696635,6.125,1193544.39,34,480]))
	- field (class: scala.collection.immutable.Stream$$anonfun$map$1, name: $outer, type: class scala.collection.immutable.Stream)
	- object (class scala.collection.immutable.Stream$$anonfun$map$1, <function0>)
	- field (class: scala.collection.immutable.Stream$Cons, name: tl, type: interface scala.Function0)
	- object (class scala.collection.immutable.Stream$Cons, Stream([count,3526154,3526154,1580402,3526154,3526154], [mean,5.50388599500189E11,4.178168090221903,234846.780654818,5.134865351881966,354.7084951479714], [stddev,2.596112361975223E11,0.34382335723646484,118170.68592261613,3.3833930336063456,4.011812510792076], [min,100002091588,2.75,0.85,-1,292], [max,999995696635,6.125,1193544.39,34,480]))
	- field (class: org.apache.spark.sql.execution.LocalTableScan, name: rows, type: interface scala.collection.Seq)
	- object (class org.apache.spark.sql.execution.LocalTableScan, LocalTableScan [summary#228,C0#229,C3#230,C4#231,C5#232,C6#233], [[count,3526154,3526154,1580402,3526154,3526154],[mean,5.50388599500189E11,4.178168090221903,234846.780654818,5.134865351881966,354.7084951479714],[stddev,2.596112361975223E11,0.34382335723646484,118170.68592261613,3.3833930336063456,4.011812510792076],[min,100002091588,2.75,0.85,-1,292],[max,999995696635,6.125,1193544.39,34,480]]
)
	- field (class: org.apache.spark.sql.execution.ConvertToUnsafe, name: child, type: class org.apache.spark.sql.execution.SparkPlan)
	- object (class org.apache.spark.sql.execution.ConvertToUnsafe, ConvertToUnsafe
+- LocalTableScan [summary#228,C0#229,C3#230,C4#231,C5#232,C6#233], [[count,3526154,3526154,1580402,3526154,3526154],[mean,5.50388599500189E11,4.178168090221903,234846.780654818,5.134865351881966,354.7084951479714],[stddev,2.596112361975223E11,0.34382335723646484,118170.68592261613,3.3833930336063456,4.011812510792076],[min,100002091588,2.75,0.85,-1,292],[max,999995696635,6.125,1193544.39,34,480]]
)
	- field (class: org.apache.spark.sql.execution.ConvertToUnsafe$$anonfun$1, name: $outer, type: class org.apache.spark.sql.execution.ConvertToUnsafe)
	- object (class org.apache.spark.sql.execution.ConvertToUnsafe$$anonfun$1, <function1>)
{code}

> unionAll raises "Task not serializable"
> ---------------------------------------
>
>                 Key: SPARK-16449
>                 URL: https://issues.apache.org/jira/browse/SPARK-16449
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 1.6.1
>         Environment: AWS EMR, Jupyter notebook
>            Reporter: Jeff Levy
>            Priority: Minor
>
> Goal: Take the output from `describe` on a large DataFrame, then use a loop to calculate `skewness` and `kurtosis` from pyspark.sql.functions for each column, build them into a DataFrame of two rows, then use `unionAll` to merge them together.
> Issue: Despite having the same column names, in the same order with the same dtypes, the `unionAll` fails with "Task not serializable".  However, if I build two test rows using dummy data then `unionAll` works fine.  Also, if I collect my results then turn them straight back into DataFrames, `unionAll` succeeds.  
> Step-by-step code and output with comments can be seen here: https://github.com/UrbanInstitute/pyspark-tutorials/blob/master/unionAll%20error.ipynb
> The issue appears to be in the way the loop in code block 6 is building the rows before parallelizing, but the results look no different from the test rows that do work.  I reproduced this on multiple datasets, so downloading the notebook and pointing it to any data of your own should replicate it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org