You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2019/03/29 20:10:00 UTC

[jira] [Resolved] (SPARK-27150) Scheduling Within an Application : Spark SQL randomly failed on UDF

     [ https://issues.apache.org/jira/browse/SPARK-27150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean Owen resolved SPARK-27150.
-------------------------------
    Resolution: Duplicate

I'm guessing this is a duplicate of SPARK-26555

> Scheduling Within an Application : Spark SQL randomly failed on UDF
> -------------------------------------------------------------------
>
>                 Key: SPARK-27150
>                 URL: https://issues.apache.org/jira/browse/SPARK-27150
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, SQL
>    Affects Versions: 2.3.1, 2.3.2, 2.3.3, 2.4.0
>            Reporter: Josh Sean
>            Priority: Major
>
> I run this (reduced) following code multiples times under the same exact input files : 
> {code:java}
> def myUdf(input : java.lang.String) : Option[Long] = {
>   None
> }
> ...
> val sparkMain = ... .getOrCreate()
> val d = inputPaths.toList.par
> val p = new scala.concurrent.forkjoin.ForkJoinPool(12)
> try {
>    d.tasksupport = new scala.collection.parallel.ForkJoinTaskSupport(p)
>    d.foreach {
>     case (inputPath) => {
>       val spark = sparkMain.newSession()
>       
>       spark.udf.register("myUdf",udf(myUdf _)) 
>       val df = spark.read.format("csv").option("inferSchema", "false").option("mode", "DROPMALFORMED").schema(mySchema).load(inputPath) 
>       df.createOrReplaceTempView("mytable")
>       val sql = spark.sql(""" SELECT CAST( myUdf(updated_date) as long) FROM mytable """)
>       sql.write.parquet( ... ) 
>    }
>  }
> } finally {
>   p.shutdown()
> }{code}
> Once in ten (spark-submit the application), the driver failed with an Exception related to Spark SQL and the UDF. However, as you can see, I have reduced the UDF to minimum, it now returns None everytime, and the problem still occurs. So, I think the problem is more likely related to having the driver submitting multiples jobs in parallel, aka "scheduling within apps".
> The exception is as follow :
> {code:java}
> Exception in thread "main" java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:65)
> at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
> Caused by: org.apache.spark.sql.AnalysisException: cannot resolve 'CAST(UDF(updated_date) AS BIGINT)' due to data type mismatch: cannot cast struct<> to bigint; line 5 pos 10;
> ...
> at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:93)
> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
> at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
> at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
> at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
> at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
> at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
> at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
> at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
> at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
> at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106)
> at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:118)
> at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:122)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:285)
> at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:122)
> at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127)
> at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
> at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127)
> at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
> at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
> at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
> at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
> at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
> at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
> at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
> at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
> at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)
> ...
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
> at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
> at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
> at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
> at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
> at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
> at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
> at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
> at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}
> So basically, with the exact same inputs, nine of ten times, everything succeed. But approximately one of ten, the previous exception occurs, so it's very strange, and tend to prove that there are some side effect inside Spark core code when using scheduling within app  ...
> Thanks for investigating
> *EDIT :*
> When I do :
> {code:java}
> val sql = spark.sql(""" SELECT myUdf(updated_date) FROM mytable """)
> {code}
> Instead of :
> {code:java}
> val sql = spark.sql(""" SELECT CAST( myUdf(updated_date) as long) FROM mytable """)
> {code}
> I'm no longer ensuring that there is either a long or null returned by the UDF. So it fails at the parquet level :
> {code:java}
> Caused by: org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: optional group updated_date {
> }
> {code}
> So it seems that in such case (when using scheduling within app) the UDF is returning randomly a struct/group instead of null/None ... 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org