You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2020/05/06 11:52:00 UTC

[jira] [Assigned] (SPARK-31399) Closure cleaner broken in Scala 2.12

     [ https://issues.apache.org/jira/browse/SPARK-31399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-31399:
------------------------------------

    Assignee: Apache Spark  (was: Kris Mok)

> Closure cleaner broken in Scala 2.12
> ------------------------------------
>
>                 Key: SPARK-31399
>                 URL: https://issues.apache.org/jira/browse/SPARK-31399
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.5, 3.0.0
>            Reporter: Wenchen Fan
>            Assignee: Apache Spark
>            Priority: Blocker
>
> The `ClosureCleaner` only support Scala functions and it uses the following check to catch closures
> {code}
>   // Check whether a class represents a Scala closure
>   private def isClosure(cls: Class[_]): Boolean = {
>     cls.getName.contains("$anonfun$")
>   }
> {code}
> This doesn't work in 3.0 any more as we upgrade to Scala 2.12 and most Scala functions become Java lambdas.
> As an example, the following code works well in Spark 2.4 Spark Shell:
> {code}
> scala> :pa
> // Entering paste mode (ctrl-D to finish)
> import org.apache.spark.sql.functions.lit
> case class Foo(id: String)
> val col = lit("123")
> val df = sc.range(0,10,1,1).map { _ => Foo("") }
> // Exiting paste mode, now interpreting.
> import org.apache.spark.sql.functions.lit
> defined class Foo
> col: org.apache.spark.sql.Column = 123
> df: org.apache.spark.rdd.RDD[Foo] = MapPartitionsRDD[5] at map at <pastie>:20
> {code}
> But fails in 3.0
> {code}
> scala> :pa
> // Entering paste mode (ctrl-D to finish)
> import org.apache.spark.sql.functions.lit
> case class Foo(id: String)
> val col = lit("123")
> val df = sc.range(0,10,1,1).map { _ => Foo("") }
> // Exiting paste mode, now interpreting.
> org.apache.spark.SparkException: Task not serializable
>   at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396)
>   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386)
>   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
>   at org.apache.spark.SparkContext.clean(SparkContext.scala:2371)
>   at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:422)
>   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
>   at org.apache.spark.rdd.RDD.map(RDD.scala:421)
>   ... 39 elided
> Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
> Serialization stack:
> 	- object not serializable (class: org.apache.spark.sql.Column, value: 123)
> 	- field (class: $iw, name: col, type: class org.apache.spark.sql.Column)
> 	- object (class $iw, $iw@2d87ac2b)
> 	- element of array (index: 0)
> 	- array (class [Ljava.lang.Object;, size 1)
> 	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
> 	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class $iw, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic $anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1])
> 	- writeReplace data (class: java.lang.invoke.SerializedLambda)
> 	- object (class $Lambda$2438/170049100, $Lambda$2438/170049100@d6b8c43)
>   at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
>   at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>   at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
>   at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:393)
>   ... 47 more
> {code}
> **Apache Spark 2.4.5 with Scala 2.12**
> {code}
> Welcome to
>       ____              __
>      / __/__  ___ _____/ /__
>     _\ \/ _ \/ _ `/ __/  '_/
>    /___/ .__/\_,_/_/ /_/\_\   version 2.4.5
>       /_/
> Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_242)
> Type in expressions to have them evaluated.
> Type :help for more information.
> scala> :pa
> // Entering paste mode (ctrl-D to finish)
> import org.apache.spark.sql.functions.lit
> case class Foo(id: String)
> val col = lit("123")
> val df = sc.range(0,10,1,1).map { _ => Foo("") }
> // Exiting paste mode, now interpreting.
> org.apache.spark.SparkException: Task not serializable
>   at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
>   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393)
>   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
>   at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
>   at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:393)
>   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
>   at org.apache.spark.rdd.RDD.map(RDD.scala:392)
>   ... 45 elided
> Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
> Serialization stack:
> 	- object not serializable (class: org.apache.spark.sql.Column, value: 123)
> 	- field (class: $iw, name: col, type: class org.apache.spark.sql.Column)
> 	- object (class $iw, $iw@73534675)
> 	- element of array (index: 0)
> 	- array (class [Ljava.lang.Object;, size 1)
> 	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
> 	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class $iw, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic $anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1])
> 	- writeReplace data (class: java.lang.invoke.SerializedLambda)
> 	- object (class $Lambda$1952/356563238, $Lambda$1952/356563238@6ca95b1e)
>   at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
>   at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
>   at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
>   at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
>   ... 53 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org