You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jen-Ming Chung (JIRA)" <ji...@apache.org> on 2017/10/31 10:44:00 UTC

[jira] [Issue Comment Deleted] (SPARK-19039) UDF ClosureCleaner bug when UDF, col applied in paste mode in REPL

     [ https://issues.apache.org/jira/browse/SPARK-19039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jen-Ming Chung updated SPARK-19039:
-----------------------------------
    Comment: was deleted

(was: It's weird..you will not get error messages if you paste the code line-by-line.

{code}
17/10/31 09:37:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://ip-172-31-9-112.ap-northeast-1.compute.internal:4040
Spark context available as 'sc' (master = local[*], app id = local-1509442670084).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.0-SNAPSHOT
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_151)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> val df = spark.createDataFrame(Seq(
     |   ("hi", 1),
     |   ("there", 2),
     |   ("the", 3),
     |   ("end", 4)
     | )).toDF("a", "b")
df: org.apache.spark.sql.DataFrame = [a: string, b: int]

scala> val myNumbers = Set(1,2,3)
myNumbers: scala.collection.immutable.Set[Int] = Set(1, 2, 3)

scala> val tmpUDF = udf { (n: Int) => myNumbers.contains(n) }
tmpUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,BooleanType,Some(List(IntegerType)))

scala> val rowHasMyNumber = tmpUDF($"b")
rowHasMyNumber: org.apache.spark.sql.Column = UDF(b)

scala> df.where(rowHasMyNumber).show()
+-----+---+
|    a|  b|
+-----+---+
|   hi|  1|
|there|  2|
|  the|  3|
+-----+---+
{code} )

> UDF ClosureCleaner bug when UDF, col applied in paste mode in REPL
> ------------------------------------------------------------------
>
>                 Key: SPARK-19039
>                 URL: https://issues.apache.org/jira/browse/SPARK-19039
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.3, 2.0.2, 2.1.0, 2.3.0
>            Reporter: Joseph K. Bradley
>
> When I try this:
> * Define UDF
> * Apply UDF to get Column
> * Use Column in a DataFrame
> I can find weird behavior in the spark-shell when using paste mode.
> To reproduce this, paste this into the spark-shell:
> {code}
> import org.apache.spark.sql.functions._
> val df = spark.createDataFrame(Seq(
>   ("hi", 1),
>   ("there", 2),
>   ("the", 3),
>   ("end", 4)
> )).toDF("a", "b")
> val myNumbers = Set(1,2,3)
> val tmpUDF = udf { (n: Int) => myNumbers.contains(n) }
> val rowHasMyNumber = tmpUDF($"b")
> df.where(rowHasMyNumber).show()
> {code}
> Stack trace for Spark 2.0 (similar for other versions):
> {code}
> org.apache.spark.SparkException: Task not serializable
> 	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
> 	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
> 	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
> 	at org.apache.spark.SparkContext.clean(SparkContext.scala:2057)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:817)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:816)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> 	at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
> 	at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:816)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
> 	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:225)
> 	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:308)
> 	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
> 	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
> 	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
> 	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2551)
> 	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
> 	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
> 	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935)
> 	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934)
> 	at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2581)
> 	at org.apache.spark.sql.Dataset.head(Dataset.scala:1934)
> 	at org.apache.spark.sql.Dataset.take(Dataset.scala:2149)
> 	at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
> 	at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
> 	at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
> 	at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
> 	at linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw.<init>(<console>:45)
> 	at linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw.<init>(<console>:57)
> 	at linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw.<init>(<console>:59)
> 	at linef732283eefe649f4877db916c5ad096f25.$read$$iw.<init>(<console>:61)
> 	at linef732283eefe649f4877db916c5ad096f25.$eval$.$print$lzycompute(<console>:7)
> 	at linef732283eefe649f4877db916c5ad096f25.$eval$.$print(<console>:6)
> Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
> Serialization stack:
> 	- object not serializable (class: org.apache.spark.sql.Column, value: UDF(b))
> 	- field (class: linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw, name: rowHasMyNumber, type: class org.apache.spark.sql.Column)
> 	- object (class linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw, linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw@6688375a)
> 	- field (class: linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw$$anonfun$1, name: $outer, type: class linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw)
> 	- object (class linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw$$anonfun$1, <function1>)
> 	- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, name: func$2, type: interface scala.Function1)
> 	- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, <function1>)
> 	- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
> 	- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(input[1, int, false]))
> 	- element of array (index: 1)
> 	- array (class [Ljava.lang.Object;, size 2)
> 	- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, name: references$1, type: class [Ljava.lang.Object;)
> 	- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, <function2>)
> 	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
> 	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
> 	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
> 	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
> 	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
> 	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
> 	at org.apache.spark.SparkContext.clean(SparkContext.scala:2057)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:817)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:816)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> 	at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
> 	at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:816)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
> 	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:225)
> 	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:308)
> 	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
> 	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
> 	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
> 	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2551)
> 	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
> 	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
> 	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935)
> 	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934)
> 	at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2581)
> 	at org.apache.spark.sql.Dataset.head(Dataset.scala:1934)
> 	at org.apache.spark.sql.Dataset.take(Dataset.scala:2149)
> 	at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
> 	at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
> 	at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
> 	at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
> 	at linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw.<init>(<console>:45)
> 	at linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw.<init>(<console>:57)
> 	at linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw.<init>(<console>:59)
> 	at linef732283eefe649f4877db916c5ad096f25.$read$$iw.<init>(<console>:61)
> 	at linef732283eefe649f4877db916c5ad096f25.$eval$.$print$lzycompute(<console>:7)
> 	at linef732283eefe649f4877db916c5ad096f25.$eval$.$print(<console>:6)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org