You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean R. Owen (Jira)" <ji...@apache.org> on 2019/10/26 23:47:00 UTC
[jira] [Resolved] (SPARK-26534) Closure Cleaner Bug
[ https://issues.apache.org/jira/browse/SPARK-26534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean R. Owen resolved SPARK-26534.
----------------------------------
Resolution: Not A Problem
I think that's as expected, at least why there is a connection in the closure: the map function refers to foo, in an instance of a function, so needs all of that function; function has a thingy reference.
You are right that you and I realize it isn't exactly needed, but Spark a) can't infer all those cases and b) also can't necessarily null out references in _transitive_ dependent objects because it could modify local object state.
I think the answer is needed just express it differently to avoid those entanglements. If correct then I don't consider this a Spark bug; if there's a robust way to handle this case without breaking other code, that could be OK to consider proposing.
> Closure Cleaner Bug
> -------------------
>
> Key: SPARK-26534
> URL: https://issues.apache.org/jira/browse/SPARK-26534
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.3.1
> Reporter: sam
> Priority: Minor
>
> I've found a strange combination of closures where the closure cleaner doesn't seem to be smart enough to figure out how to remove a reference that is not used. I.e. we get a `org.apache.spark.SparkException: Task not serializable` for a Task that is perfectly serializable.
>
> In the example below, the only `val` that is actually needed for the closure of the `map` is `foo`, but it tries to serialise `thingy`. What is odd is changing this code in a number of subtle ways eliminates the error, which I've tried to highlight using comments inline.
>
> {code:java}
> import org.apache.spark.sql._
> object Test {
> val sparkSession: SparkSession =
> SparkSession.builder.master("local").appName("app").getOrCreate()
> def apply(): Unit = {
> import sparkSession.implicits._
> val landedData: Dataset[String] = sparkSession.sparkContext.makeRDD(Seq("foo", "bar")).toDS()
> // thingy has to be in this outer scope to reproduce, if in someFunc, cannot reproduce
> val thingy: Thingy = new Thingy
> // If not wrapped in someFunc cannot reproduce
> val someFunc = () => {
> // If don't reference this foo inside the closer (e.g. just use identity function) cannot reproduce
> val foo: String = "foo"
> thingy.run(block = () => {
> landedData.map(r => {
> r + foo
> })
> .count()
> })
> }
> someFunc()
> }
> }
> class Thingy {
> def run[R](block: () => R): R = {
> block()
> }
> }
> {code}
> The full trace if ran in `sbt console`
> {code}
> scala> class Thingy {
> | def run[R](block: () => R): R = {
> | block()
> | }
> | }
> defined class Thingy
> scala>
> scala> object Test {
> | val sparkSession: SparkSession =
> | SparkSession.builder.master("local").appName("app").getOrCreate()
> |
> | def apply(): Unit = {
> | import sparkSession.implicits._
> |
> | val landedData: Dataset[String] = sparkSession.sparkContext.makeRDD(Seq("foo", "bar")).toDS()
> |
> | // thingy has to be in this outer scope to reproduce, if in someFunc, cannot reproduce
> | val thingy: Thingy = new Thingy
> |
> | // If not wrapped in someFunc cannot reproduce
> | val someFunc = () => {
> | // If don't reference this foo inside the closer (e.g. just use identity function) cannot reproduce
> | val foo: String = "foo"
> |
> | thingy.run(block = () => {
> | landedData.map(r => {
> | r + foo
> | })
> | .count()
> | })
> | }
> |
> | someFunc()
> |
> | }
> | }
> defined object Test
> scala>
> scala>
> scala> Test.apply()
> Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
> 19/01/07 11:27:19 INFO SparkContext: Running Spark version 2.3.1
> 19/01/07 11:27:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
> 19/01/07 11:27:20 INFO SparkContext: Submitted application: app
> 19/01/07 11:27:20 INFO SecurityManager: Changing view acls to: sams
> 19/01/07 11:27:20 INFO SecurityManager: Changing modify acls to: sams
> 19/01/07 11:27:20 INFO SecurityManager: Changing view acls groups to:
> 19/01/07 11:27:20 INFO SecurityManager: Changing modify acls groups to:
> 19/01/07 11:27:20 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(sams); groups with view permissions: Set(); users with modify permissions: Set(sams); groups with modify permissions: Set()
> 19/01/07 11:27:20 INFO Utils: Successfully started service 'sparkDriver' on port 54066.
> 19/01/07 11:27:20 INFO SparkEnv: Registering MapOutputTracker
> 19/01/07 11:27:20 INFO SparkEnv: Registering BlockManagerMaster
> 19/01/07 11:27:20 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
> 19/01/07 11:27:20 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
> 19/01/07 11:27:20 INFO DiskBlockManager: Created local directory at /private/var/folders/x9/r21b5ttd1wx8zq9qtckfp411n7085c/T/blockmgr-c35bdd46-4804-427b-a513-ee8778814f88
> 19/01/07 11:27:20 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
> 19/01/07 11:27:20 INFO SparkEnv: Registering OutputCommitCoordinator
> 19/01/07 11:27:20 INFO Utils: Successfully started service 'SparkUI' on port 4040.
> 19/01/07 11:27:20 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.197.196.44:4040
> 19/01/07 11:27:21 INFO Executor: Starting executor ID driver on host localhost
> 19/01/07 11:27:21 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 54067.
> 19/01/07 11:27:21 INFO NettyBlockTransferService: Server created on 10.197.196.44:54067
> 19/01/07 11:27:21 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
> 19/01/07 11:27:21 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.197.196.44, 54067, None)
> 19/01/07 11:27:21 INFO BlockManagerMasterEndpoint: Registering block manager 10.197.196.44:54067 with 366.3 MB RAM, BlockManagerId(driver, 10.197.196.44, 54067, None)
> 19/01/07 11:27:21 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.197.196.44, 54067, None)
> 19/01/07 11:27:21 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.197.196.44, 54067, None)
> 19/01/07 11:27:23 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/Users/sams/src/asos-datalake-staging/staging/spark-warehouse/').
> 19/01/07 11:27:23 INFO SharedState: Warehouse path is 'file:/Users/sams/src/asos-datalake-staging/staging/spark-warehouse/'.
> 19/01/07 11:27:23 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
> 19/01/07 11:27:24 INFO CodeGenerator: Code generated in 257.291388 ms
> 19/01/07 11:27:24 INFO CodeGenerator: Code generated in 33.985273 ms
> org.apache.spark.SparkException: Task not serializable
> at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
> at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:844)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:843)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
> at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:843)
> at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:608)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
> at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
> at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
> at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
> at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
> at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
> at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
> at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294)
> at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2770)
> at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2769)
> at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
> at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
> at org.apache.spark.sql.Dataset.count(Dataset.scala:2769)
> at Test$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply$mcJ$sp(<console>:36)
> at Test$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply(<console>:32)
> at Test$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply(<console>:32)
> at Thingy.run(<console>:16)
> at Test$$anonfun$1.apply$mcJ$sp(<console>:32)
> at Test$.apply(<console>:40)
> ... 40 elided
> Caused by: java.io.NotSerializableException: Thingy
> Serialization stack:
> - object not serializable (class: Thingy, value: Thingy@679723a6)
> - field (class: Test$$anonfun$1, name: thingy$1, type: class Thingy)
> - object (class Test$$anonfun$1, <function0>)
> - field (class: Test$$anonfun$1$$anonfun$apply$mcJ$sp$1, name: $outer, type: class Test$$anonfun$1)
> - object (class Test$$anonfun$1$$anonfun$apply$mcJ$sp$1, <function0>)
> - field (class: Test$$anonfun$1$$anonfun$apply$mcJ$sp$1$$anonfun$apply$mcJ$sp$2, name: $outer, type: class Test$$anonfun$1$$anonfun$apply$mcJ$sp$1)
> - object (class Test$$anonfun$1$$anonfun$apply$mcJ$sp$1$$anonfun$apply$mcJ$sp$2, <function1>)
> - element of array (index: 0)
> - array (class [Ljava.lang.Object;, size 3)
> - field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, name: references$1, type: class [Ljava.lang.Object;)
> - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, <function2>)
> at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
> at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
> at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
> at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
> ... 90 more
> scala>
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org