You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2020/05/06 11:52:00 UTC
[jira] [Assigned] (SPARK-31399) Closure cleaner broken in Scala
2.12
[ https://issues.apache.org/jira/browse/SPARK-31399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-31399:
------------------------------------
Assignee: Apache Spark (was: Kris Mok)
> Closure cleaner broken in Scala 2.12
> ------------------------------------
>
> Key: SPARK-31399
> URL: https://issues.apache.org/jira/browse/SPARK-31399
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.4.5, 3.0.0
> Reporter: Wenchen Fan
> Assignee: Apache Spark
> Priority: Blocker
>
> The `ClosureCleaner` only support Scala functions and it uses the following check to catch closures
> {code}
> // Check whether a class represents a Scala closure
> private def isClosure(cls: Class[_]): Boolean = {
> cls.getName.contains("$anonfun$")
> }
> {code}
> This doesn't work in 3.0 any more as we upgrade to Scala 2.12 and most Scala functions become Java lambdas.
> As an example, the following code works well in Spark 2.4 Spark Shell:
> {code}
> scala> :pa
> // Entering paste mode (ctrl-D to finish)
> import org.apache.spark.sql.functions.lit
> case class Foo(id: String)
> val col = lit("123")
> val df = sc.range(0,10,1,1).map { _ => Foo("") }
> // Exiting paste mode, now interpreting.
> import org.apache.spark.sql.functions.lit
> defined class Foo
> col: org.apache.spark.sql.Column = 123
> df: org.apache.spark.rdd.RDD[Foo] = MapPartitionsRDD[5] at map at <pastie>:20
> {code}
> But fails in 3.0
> {code}
> scala> :pa
> // Entering paste mode (ctrl-D to finish)
> import org.apache.spark.sql.functions.lit
> case class Foo(id: String)
> val col = lit("123")
> val df = sc.range(0,10,1,1).map { _ => Foo("") }
> // Exiting paste mode, now interpreting.
> org.apache.spark.SparkException: Task not serializable
> at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2371)
> at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:422)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
> at org.apache.spark.rdd.RDD.map(RDD.scala:421)
> ... 39 elided
> Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
> Serialization stack:
> - object not serializable (class: org.apache.spark.sql.Column, value: 123)
> - field (class: $iw, name: col, type: class org.apache.spark.sql.Column)
> - object (class $iw, $iw@2d87ac2b)
> - element of array (index: 0)
> - array (class [Ljava.lang.Object;, size 1)
> - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
> - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class $iw, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic $anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1])
> - writeReplace data (class: java.lang.invoke.SerializedLambda)
> - object (class $Lambda$2438/170049100, $Lambda$2438/170049100@d6b8c43)
> at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
> at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
> at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
> at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:393)
> ... 47 more
> {code}
> **Apache Spark 2.4.5 with Scala 2.12**
> {code}
> Welcome to
> ____ __
> / __/__ ___ _____/ /__
> _\ \/ _ \/ _ `/ __/ '_/
> /___/ .__/\_,_/_/ /_/\_\ version 2.4.5
> /_/
> Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_242)
> Type in expressions to have them evaluated.
> Type :help for more information.
> scala> :pa
> // Entering paste mode (ctrl-D to finish)
> import org.apache.spark.sql.functions.lit
> case class Foo(id: String)
> val col = lit("123")
> val df = sc.range(0,10,1,1).map { _ => Foo("") }
> // Exiting paste mode, now interpreting.
> org.apache.spark.SparkException: Task not serializable
> at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
> at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:393)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
> at org.apache.spark.rdd.RDD.map(RDD.scala:392)
> ... 45 elided
> Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
> Serialization stack:
> - object not serializable (class: org.apache.spark.sql.Column, value: 123)
> - field (class: $iw, name: col, type: class org.apache.spark.sql.Column)
> - object (class $iw, $iw@73534675)
> - element of array (index: 0)
> - array (class [Ljava.lang.Object;, size 1)
> - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
> - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class $iw, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic $anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1])
> - writeReplace data (class: java.lang.invoke.SerializedLambda)
> - object (class $Lambda$1952/356563238, $Lambda$1952/356563238@6ca95b1e)
> at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
> at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
> at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
> at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
> ... 53 more
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org