You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean R. Owen (Jira)" <ji...@apache.org> on 2019/10/26 23:47:00 UTC

[jira] [Resolved] (SPARK-26534) Closure Cleaner Bug

     [ https://issues.apache.org/jira/browse/SPARK-26534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean R. Owen resolved SPARK-26534.
----------------------------------
    Resolution: Not A Problem

I think that's as expected, at least why there is a connection in the closure: the map function refers to foo, in an instance of a function, so needs all of that function; function has a thingy reference.

You are right that you and I realize it isn't exactly needed, but Spark a) can't infer all those cases and b) also can't necessarily null out references in _transitive_ dependent objects because it could modify local object state.

I think the answer is needed just express it differently to avoid those entanglements. If correct then I don't consider this a Spark bug; if there's a robust way to handle this case without breaking other code, that could be OK to consider proposing.

> Closure Cleaner Bug
> -------------------
>
>                 Key: SPARK-26534
>                 URL: https://issues.apache.org/jira/browse/SPARK-26534
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.3.1
>            Reporter: sam
>            Priority: Minor
>
> I've found a strange combination of closures where the closure cleaner doesn't seem to be smart enough to figure out how to remove a reference that is not used. I.e. we get a `org.apache.spark.SparkException: Task not serializable` for a Task that is perfectly serializable.  
>  
> In the example below, the only `val` that is actually needed for the closure of the `map` is `foo`, but it tries to serialise `thingy`.  What is odd is changing this code in a number of subtle ways eliminates the error, which I've tried to highlight using comments inline.
>  
> {code:java}
> import org.apache.spark.sql._
> object Test {
>   val sparkSession: SparkSession =
>     SparkSession.builder.master("local").appName("app").getOrCreate()
>   def apply(): Unit = {
>     import sparkSession.implicits._
>     val landedData: Dataset[String] = sparkSession.sparkContext.makeRDD(Seq("foo", "bar")).toDS()
>     // thingy has to be in this outer scope to reproduce, if in someFunc, cannot reproduce
>     val thingy: Thingy = new Thingy
>     // If not wrapped in someFunc cannot reproduce
>     val someFunc = () => {
>       // If don't reference this foo inside the closer (e.g. just use identity function) cannot reproduce
>       val foo: String = "foo"
>       thingy.run(block = () => {
>         landedData.map(r => {
>           r + foo
>         })
>         .count()
>       })
>     }
>     someFunc()
>   }
> }
> class Thingy {
>   def run[R](block: () => R): R = {
>     block()
>   }
> }
> {code}
> The full trace if ran in `sbt console`
> {code}
> scala> class Thingy {
>      |   def run[R](block: () => R): R = {
>      |     block()
>      |   }
>      | }
> defined class Thingy
> scala> 
> scala> object Test {
>      |   val sparkSession: SparkSession =
>      |     SparkSession.builder.master("local").appName("app").getOrCreate()
>      | 
>      |   def apply(): Unit = {
>      |     import sparkSession.implicits._
>      | 
>      |     val landedData: Dataset[String] = sparkSession.sparkContext.makeRDD(Seq("foo", "bar")).toDS()
>      | 
>      |     // thingy has to be in this outer scope to reproduce, if in someFunc, cannot reproduce
>      |     val thingy: Thingy = new Thingy
>      | 
>      |     // If not wrapped in someFunc cannot reproduce
>      |     val someFunc = () => {
>      |       // If don't reference this foo inside the closer (e.g. just use identity function) cannot reproduce
>      |       val foo: String = "foo"
>      | 
>      |       thingy.run(block = () => {
>      |         landedData.map(r => {
>      |           r + foo
>      |         })
>      |         .count()
>      |       })
>      |     }
>      | 
>      |     someFunc()
>      | 
>      |   }
>      | }
> defined object Test
> scala> 
> scala> 
> scala> Test.apply()
> Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
> 19/01/07 11:27:19 INFO SparkContext: Running Spark version 2.3.1
> 19/01/07 11:27:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
> 19/01/07 11:27:20 INFO SparkContext: Submitted application: app
> 19/01/07 11:27:20 INFO SecurityManager: Changing view acls to: sams
> 19/01/07 11:27:20 INFO SecurityManager: Changing modify acls to: sams
> 19/01/07 11:27:20 INFO SecurityManager: Changing view acls groups to: 
> 19/01/07 11:27:20 INFO SecurityManager: Changing modify acls groups to: 
> 19/01/07 11:27:20 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(sams); groups with view permissions: Set(); users  with modify permissions: Set(sams); groups with modify permissions: Set()
> 19/01/07 11:27:20 INFO Utils: Successfully started service 'sparkDriver' on port 54066.
> 19/01/07 11:27:20 INFO SparkEnv: Registering MapOutputTracker
> 19/01/07 11:27:20 INFO SparkEnv: Registering BlockManagerMaster
> 19/01/07 11:27:20 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
> 19/01/07 11:27:20 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
> 19/01/07 11:27:20 INFO DiskBlockManager: Created local directory at /private/var/folders/x9/r21b5ttd1wx8zq9qtckfp411n7085c/T/blockmgr-c35bdd46-4804-427b-a513-ee8778814f88
> 19/01/07 11:27:20 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
> 19/01/07 11:27:20 INFO SparkEnv: Registering OutputCommitCoordinator
> 19/01/07 11:27:20 INFO Utils: Successfully started service 'SparkUI' on port 4040.
> 19/01/07 11:27:20 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.197.196.44:4040
> 19/01/07 11:27:21 INFO Executor: Starting executor ID driver on host localhost
> 19/01/07 11:27:21 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 54067.
> 19/01/07 11:27:21 INFO NettyBlockTransferService: Server created on 10.197.196.44:54067
> 19/01/07 11:27:21 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
> 19/01/07 11:27:21 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.197.196.44, 54067, None)
> 19/01/07 11:27:21 INFO BlockManagerMasterEndpoint: Registering block manager 10.197.196.44:54067 with 366.3 MB RAM, BlockManagerId(driver, 10.197.196.44, 54067, None)
> 19/01/07 11:27:21 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.197.196.44, 54067, None)
> 19/01/07 11:27:21 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.197.196.44, 54067, None)
> 19/01/07 11:27:23 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/Users/sams/src/asos-datalake-staging/staging/spark-warehouse/').
> 19/01/07 11:27:23 INFO SharedState: Warehouse path is 'file:/Users/sams/src/asos-datalake-staging/staging/spark-warehouse/'.
> 19/01/07 11:27:23 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
> 19/01/07 11:27:24 INFO CodeGenerator: Code generated in 257.291388 ms
> 19/01/07 11:27:24 INFO CodeGenerator: Code generated in 33.985273 ms
> org.apache.spark.SparkException: Task not serializable
>   at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
>   at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
>   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
>   at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
>   at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:844)
>   at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:843)
>   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
>   at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:843)
>   at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:608)
>   at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>   at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>   at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>   at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
>   at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
>   at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
>   at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
>   at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
>   at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>   at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>   at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>   at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
>   at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
>   at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
>   at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>   at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>   at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>   at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
>   at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294)
>   at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2770)
>   at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2769)
>   at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
>   at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
>   at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
>   at org.apache.spark.sql.Dataset.count(Dataset.scala:2769)
>   at Test$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply$mcJ$sp(<console>:36)
>   at Test$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply(<console>:32)
>   at Test$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply(<console>:32)
>   at Thingy.run(<console>:16)
>   at Test$$anonfun$1.apply$mcJ$sp(<console>:32)
>   at Test$.apply(<console>:40)
>   ... 40 elided
> Caused by: java.io.NotSerializableException: Thingy
> Serialization stack:
> 	- object not serializable (class: Thingy, value: Thingy@679723a6)
> 	- field (class: Test$$anonfun$1, name: thingy$1, type: class Thingy)
> 	- object (class Test$$anonfun$1, <function0>)
> 	- field (class: Test$$anonfun$1$$anonfun$apply$mcJ$sp$1, name: $outer, type: class Test$$anonfun$1)
> 	- object (class Test$$anonfun$1$$anonfun$apply$mcJ$sp$1, <function0>)
> 	- field (class: Test$$anonfun$1$$anonfun$apply$mcJ$sp$1$$anonfun$apply$mcJ$sp$2, name: $outer, type: class Test$$anonfun$1$$anonfun$apply$mcJ$sp$1)
> 	- object (class Test$$anonfun$1$$anonfun$apply$mcJ$sp$1$$anonfun$apply$mcJ$sp$2, <function1>)
> 	- element of array (index: 0)
> 	- array (class [Ljava.lang.Object;, size 3)
> 	- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, name: references$1, type: class [Ljava.lang.Object;)
> 	- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, <function2>)
>   at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
>   at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
>   at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
>   at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
>   ... 90 more
> scala> 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org