You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Darshan Pandya <da...@gmail.com> on 2017/02/18 03:36:28 UTC

Serialization error - sql UDF related

Hello,

I am getting the famous serialization exception on running some code as
below,

val correctColNameUDF = udf(getNewColumnName(_: String, false:
Boolean): String);
val charReference: DataFrame = thinLong.select("char_name_id",
"char_name").withColumn("columnNameInDimTable",
correctColNameUDF(col("char_name"))).withColumn("applicable_dimension",
lit(dimension).cast(StringType)).distinct();
val charReferenceTableName: String = s"""$TargetSFFDBName.fg_char_reference"""
val tableName: String = charReferenceTableName.toString
charReference.saveAsTable(tableName, saveMode)

I think it has something to do with the UDF, so I am pasting the UDF
function as well

def getNewColumnName(oldColName: String, appendID: Boolean): String = {
  var newColName = oldColName.replaceAll("\\s", "_").replaceAll("%",
"_pct").replaceAllLiterally("#", "No")
  return newColName;
}


*​Exception *seen ​is

Caused by: org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2066)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706)
at
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:86)
at
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:80)
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
... 73 more
Caused by: java.io.NotSerializableException:
com.nielsen.datamodel.converters.cip2sff.ProductDimensionSFFConverterRealApp$
Serialization stack:
- object not serializable (class:
com.nielsen.datamodel.converters.cip2sff.ProductDimensionSFFConverterRealApp$,
value:
com.nielsen.datamodel.converters.cip2sff.ProductDimensionSFFConverterRealApp$@247a8411)
- field (class:
com.nielsen.datamodel.converters.cip2sff.CommonTransformationTraits$$anonfun$1,
name: $outer, type: interface
com.nielsen.datamodel.converters.cip2sff.CommonTransformationTraits)
- object (class
com.nielsen.datamodel.converters.cip2sff.CommonTransformationTraits$$anonfun$1,
<function1>)
- field (class:
org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, name:
func$2, type: interface scala.Function1)
- object (class
org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, <function1>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name:
f, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF,
UDF(char_name#3))
- field (class: org.apache.spark.sql.catalyst.expressions.Alias, name:
child, type: class org.apache.spark.sql.catalyst.expressions.Expression)
- object (class org.apache.spark.sql.catalyst.expressions.Alias,
UDF(char_name#3) AS columnNameInDimTable#304)
- element of array (index: 2)
- array (class [Ljava.lang.Object;, size 4)
- field (class: scala.collection.mutable.ArrayBuffer, name: array, type:
class [Ljava.lang.Object;)
- object (class scala.collection.mutable.ArrayBuffer,
ArrayBuffer(char_name_id#2, char_name#3, UDF(char_name#3) AS
columnNameInDimTable#304, PRODUCT AS applicable_dimension#305))
- field (class: org.apache.spark.sql.execution.Project, name: projectList,
type: interface scala.collection.Seq)
- object (class org.apache.spark.sql.execution.Project, Project
[char_name_id#2,char_name#3,UDF(char_name#3) AS
columnNameInDimTable#304,PRODUCT AS applicable_dimension#305]



-- 
Sincerely,
Darshan

Re: Serialization error - sql UDF related

Posted by vaquar khan <va...@gmail.com>.
Hi Darshan ,


When you get org.apache.spark.SparkException: Task not serializable
exception, it means that you are using a reference to an instance of a
non-serialize class inside a transformation.

Hope following link will help.

https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html


Regards,
Vaquar khan

On Fri, Feb 17, 2017 at 9:36 PM, Darshan Pandya <da...@gmail.com>
wrote:

> Hello,
>
> I am getting the famous serialization exception on running some code as
> below,
>
> val correctColNameUDF = udf(getNewColumnName(_: String, false: Boolean): String);
> val charReference: DataFrame = thinLong.select("char_name_id", "char_name").withColumn("columnNameInDimTable", correctColNameUDF(col("char_name"))).withColumn("applicable_dimension", lit(dimension).cast(StringType)).distinct();
> val charReferenceTableName: String = s"""$TargetSFFDBName.fg_char_reference"""
> val tableName: String = charReferenceTableName.toString
> charReference.saveAsTable(tableName, saveMode)
>
> I think it has something to do with the UDF, so I am pasting the UDF
> function as well
>
> def getNewColumnName(oldColName: String, appendID: Boolean): String = {
>   var newColName = oldColName.replaceAll("\\s", "_").replaceAll("%", "_pct").replaceAllLiterally("#", "No")
>   return newColName;
> }
>
>
> *​Exception *seen ​is
>
> Caused by: org.apache.spark.SparkException: Task not serializable
> at org.apache.spark.util.ClosureCleaner$.ensureSerializable(
> ClosureCleaner.scala:304)
> at org.apache.spark.util.ClosureCleaner$.org$apache$
> spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2066)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:150)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:111)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706)
> at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$
> doExecute$1.apply(TungstenAggregate.scala:86)
> at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$
> doExecute$1.apply(TungstenAggregate.scala:80)
> at org.apache.spark.sql.catalyst.errors.package$.attachTree(
> package.scala:48)
> ... 73 more
> Caused by: java.io.NotSerializableException: com.nielsen.datamodel.
> converters.cip2sff.ProductDimensionSFFConverterRealApp$
> Serialization stack:
> - object not serializable (class: com.nielsen.datamodel.
> converters.cip2sff.ProductDimensionSFFConverterRealApp$, value:
> com.nielsen.datamodel.converters.cip2sff.ProductDimensionSFFConverterRe
> alApp$@247a8411)
> - field (class: com.nielsen.datamodel.converters.cip2sff.
> CommonTransformationTraits$$anonfun$1, name: $outer, type: interface
> com.nielsen.datamodel.converters.cip2sff.CommonTransformationTraits)
> - object (class com.nielsen.datamodel.converters.cip2sff.
> CommonTransformationTraits$$anonfun$1, <function1>)
> - field (class: org.apache.spark.sql.catalyst.
> expressions.ScalaUDF$$anonfun$2, name: func$2, type: interface
> scala.Function1)
> - object (class org.apache.spark.sql.catalyst.
> expressions.ScalaUDF$$anonfun$2, <function1>)
> - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name:
> f, type: interface scala.Function1)
> - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF,
> UDF(char_name#3))
> - field (class: org.apache.spark.sql.catalyst.expressions.Alias, name:
> child, type: class org.apache.spark.sql.catalyst.expressions.Expression)
> - object (class org.apache.spark.sql.catalyst.expressions.Alias,
> UDF(char_name#3) AS columnNameInDimTable#304)
> - element of array (index: 2)
> - array (class [Ljava.lang.Object;, size 4)
> - field (class: scala.collection.mutable.ArrayBuffer, name: array, type:
> class [Ljava.lang.Object;)
> - object (class scala.collection.mutable.ArrayBuffer,
> ArrayBuffer(char_name_id#2, char_name#3, UDF(char_name#3) AS
> columnNameInDimTable#304, PRODUCT AS applicable_dimension#305))
> - field (class: org.apache.spark.sql.execution.Project, name:
> projectList, type: interface scala.collection.Seq)
> - object (class org.apache.spark.sql.execution.Project, Project
> [char_name_id#2,char_name#3,UDF(char_name#3) AS columnNameInDimTable#304,PRODUCT
> AS applicable_dimension#305]
>
>
>
> --
> Sincerely,
> Darshan
>
>


-- 
Regards,
Vaquar Khan
+1 -224-436-0783

IT Architect / Lead Consultant
Greater Chicago

Re: Serialization error - sql UDF related

Posted by Yong Zhang <ja...@hotmail.com>.
You define "getNewColumnName" as method, which requires the class/object holding it has to be serializable.

From the stack trace, it looks like this method defined in ProductDimensionSFFConverterRealApp, but it is not serializable.


In fact, your method only uses String and Boolean, which are serializable by default. So you can change the definition to function, instead of method, which should work.


Yong


________________________________
From: Darshan Pandya <da...@gmail.com>
Sent: Friday, February 17, 2017 10:36 PM
To: user
Subject: Serialization error - sql UDF related

Hello,

I am getting the famous serialization exception on running some code as below,


val correctColNameUDF = udf(getNewColumnName(_: String, false: Boolean): String);
val charReference: DataFrame = thinLong.select("char_name_id", "char_name").withColumn("columnNameInDimTable", correctColNameUDF(col("char_name"))).withColumn("applicable_dimension", lit(dimension).cast(StringType)).distinct();
val charReferenceTableName: String = s"""$TargetSFFDBName.fg_char_reference"""
val tableName: String = charReferenceTableName.toString
charReference.saveAsTable(tableName, saveMode)

I think it has something to do with the UDF, so I am pasting the UDF function as well


def getNewColumnName(oldColName: String, appendID: Boolean): String = {
  var newColName = oldColName.replaceAll("\\s", "_").replaceAll("%", "_pct").replaceAllLiterally("#", "No")
  return newColName;
}

Exception seen is

Caused by: org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2066)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:86)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:80)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
... 73 more
Caused by: java.io.NotSerializableException: com.nielsen.datamodel.converters.cip2sff.ProductDimensionSFFConverterRealApp$
Serialization stack:
- object not serializable (class: com.nielsen.datamodel.converters.cip2sff.ProductDimensionSFFConverterRealApp$, value: com.nielsen.datamodel.converters.cip2sff.ProductDimensionSFFConverterRealApp$@247a8411)
- field (class: com.nielsen.datamodel.converters.cip2sff.CommonTransformationTraits$$anonfun$1, name: $outer, type: interface com.nielsen.datamodel.converters.cip2sff.CommonTransformationTraits)
- object (class com.nielsen.datamodel.converters.cip2sff.CommonTransformationTraits$$anonfun$1, <function1>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, name: func$2, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, <function1>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(char_name#3))
- field (class: org.apache.spark.sql.catalyst.expressions.Alias, name: child, type: class org.apache.spark.sql.catalyst.expressions.Expression)
- object (class org.apache.spark.sql.catalyst.expressions.Alias, UDF(char_name#3) AS columnNameInDimTable#304)
- element of array (index: 2)
- array (class [Ljava.lang.Object;, size 4)
- field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;)
- object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(char_name_id#2, char_name#3, UDF(char_name#3) AS columnNameInDimTable#304, PRODUCT AS applicable_dimension#305))
- field (class: org.apache.spark.sql.execution.Project, name: projectList, type: interface scala.collection.Seq)
- object (class org.apache.spark.sql.execution.Project, Project [char_name_id#2,char_name#3,UDF(char_name#3) AS columnNameInDimTable#304,PRODUCT AS applicable_dimension#305]



--
Sincerely,
Darshan