You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Chen Fan (JIRA)" <ji...@apache.org> on 2019/01/25 02:15:00 UTC

[jira] [Resolved] (SPARK-26569) Fixed point for batch Operator Optimizations never reached when optimize logicalPlan

     [ https://issues.apache.org/jira/browse/SPARK-26569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chen Fan resolved SPARK-26569.
------------------------------
    Resolution: Duplicate

> Fixed point for batch Operator Optimizations never reached when optimize logicalPlan
> ------------------------------------------------------------------------------------
>
>                 Key: SPARK-26569
>                 URL: https://issues.apache.org/jira/browse/SPARK-26569
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.0
>         Environment:  
>  
>            Reporter: Chen Fan
>            Priority: Major
>
> There is a bit complicated Spark App using DataSet api run once a day, and I noticed the app will hang once in a while, 
>  I add some log and compare two driver log which one belong to successful app, another belong to faied, and here is some results of investigation
> 1. Usually the app will running correctly, but sometime it will hang after finishing job 1
> !image-2019-01-08-19-53-20-509.png!
> 2. According to log I append , the successful app always reach the fixed point when iteration is 7 on Batch Operator Optimizations, but failed app never reached this fixed point.
> {code:java}
> 2019-01-04,11:35:34,199 DEBUG org.apache.spark.sql.execution.SparkOptimizer: 
> === Result of Batch Operator Optimizations ===
> 2019-01-04,14:00:42,847 INFO org.apache.spark.sql.execution.SparkOptimizer: iteration is 6/100, for batch Operator Optimizations
> 2019-01-04,14:00:42,851 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin ===
>  
> 2019-01-04,14:00:42,852 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicate ===
>  
> 2019-01-04,14:00:42,903 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
>  
> 2019-01-04,14:00:42,939 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.BooleanSimplification ===
>  
> 2019-01-04,14:00:42,951 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.PruneFilters ===
>  
> 2019-01-04,14:00:42,970 INFO org.apache.spark.sql.execution.SparkOptimizer: iteration is 7/100, for batch Operator Optimizations
> 2019-01-04,14:00:42,971 INFO org.apache.spark.sql.execution.SparkOptimizer: Fixed point reached for batch Operator Optimizations after 7 iterations.
> 2019-01-04,14:13:15,616 INFO org.apache.spark.sql.execution.SparkOptimizer: iteration is 45/100, for batch Operator Optimizations
> 2019-01-04,14:13:15,619 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin ===
>  
> 2019-01-04,14:13:15,620 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicate ===
>  
> 2019-01-04,14:13:59,529 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
>  
> 2019-01-04,14:13:59,806 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.BooleanSimplification ===
>  
> 2019-01-04,14:13:59,845 INFO org.apache.spark.sql.execution.SparkOptimizer: iteration is 46/100, for batch Operator Optimizations
> 2019-01-04,14:13:59,849 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin ===
>  
> 2019-01-04,14:13:59,849 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicate ===
>  
> 2019-01-04,14:14:45,340 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
>  
> 2019-01-04,14:14:45,631 INFO org.apache.spark.sql.execution.SparkOptimizer: 
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.BooleanSimplification ===
>  
> 2019-01-04,14:14:45,678 INFO org.apache.spark.sql.execution.SparkOptimizer: iteration is 47/100, for batch Operator Optimizations
> {code}
> 3. The difference between two logical plan appear in BooleanSimplification on iteration, before this rule, two logical plan is same:
> {code:java}
> // just a head part of plan
> Project [model#2486, version#12, device#11, date#30, imei#13, pType#14, releaseType#15, regionType#16, expType#17, tag#18, imeiCount#1586, startdate#2360, enddate#2361, status#2362]
> +- Join Inner, (((cast(cast(imeiCount#1586 as decimal(20,0)) as int) &gt; 10000) || (model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)) &amp;&amp; model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))) &amp;&amp; ((((((cast(cast(imeiCount#1586 as decimal(20,0)) as int) &gt; 10000) || (model#2358 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)) &amp;&amp; model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))) &amp;&amp; (date#30 &gt;= startdate#2360)) &amp;&amp; (date#30 &lt;= enddate#2361)) &amp;&amp; (model#2486 = model#2358)) &amp;&amp; (version#12 = version#2359)))
>    :- Project [device#11, version#12, date#30, imei#13, pType#14, releaseType#15, regionType#16, expType#17, tag#18, imeiCount#1586, UDF(model#10, device#11) AS model#2486]
>    :  +- Filter ((((((cast(cast(imeiCount#1586 as decimal(20,0)) as int) &gt; 10000) || UDF(model#10, device#1584) INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))) &amp;&amp; ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) &gt; 10000) || UDF(model#1583, device#11) INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))) &amp;&amp; isnotnull(UDF(model#1583, device#11))) &amp;&amp; isnotnull(UDF(model#10, device#1584))) &amp;&amp; (((cast(cast(imeiCount#1586 as decimal(20,0)) as int) &gt; 10000) || UDF(model#10, device#11) INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))) &amp;&amp; isnotnull(UDF(model#10, device#11))))
>    :     +- Join Inner, ((((model#10 = model#1583) &amp;&amp; (device#11 = device#1584)) &amp;&amp; (version#12 = version#1585)) &amp;&amp; (date#30 = date#1592))
> {code}
> 4. after BooleanSimplification, There is only one difference: Filter's constraints format like (A || B) && (A || C) on successful app but A && (B || C) on failed app.
> {code:java}
> //successful app's logical plan
> Filter ((((isnotnull(UDF(model#1583, device#11)) &amp;&amp; ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) &gt; 10000) || UDF(model#1583, device#11) INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))) &amp;&amp; isnotnull(UDF(model#10, device#1584))) &amp;&amp; ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) &gt; 10000) || UDF(model#10, device#1584) INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))) &amp;&amp; (((cast(cast(imeiCount#1586 as decimal(20,0)) as int) &gt; 10000) || UDF(model#10, device#11) INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))) &amp;&amp; isnotnull(UDF(model#10, device#11))))
> // failed app's plan
> Filter (((((cast(cast(imeiCount#1586 as decimal(20,0)) as int) &gt; 10000) || (UDF(model#10, device#1584) INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)) &amp;&amp; UDF(model#1583, device#11) INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))) &amp;&amp; isnotnull(UDF(model#1583, device#11))) &amp;&amp; isnotnull(UDF(model#10, device#1584))) &amp;&amp; (((cast(cast(imeiCount#1586 as decimal(20,0)) as int) &gt; 10000) || UDF(model#10, device#11) INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))) &amp;&amp; isnotnull(UDF(model#10, device#11))))
> {code}
> 5. hereafter, error occurred with rule InferFiltersFromConstraints:
> {code:java}
> // code about upon log
> case join @ Join(left, right, joinType, conditionOpt) =>
>       // Only consider constraints that can be pushed down completely to either the left or the
>       // right child
>       val constraints = join.constraints.filter { c =>
>         c.references.subsetOf(left.outputSet) || c.references.subsetOf(right.outputSet)
>       }
>       // Remove those constraints that are already enforced by either the left or the right child
>       val additionalConstraints = constraints -- (left.constraints ++ right.constraints)
>       logInfo(
>         s"""
>            | After remove constraints additional constraints is  ${additionalConstraints.toList.toString}
>            | left constraints is ${left.constraints.toList.toString()}
>            | right constraints is ${right.constraints.toList.toString()}
>         """.stripMargin)
> {code}
> {code:java}
> // successful app's log
> 2019-01-08,16:44:48,911 INFO org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints:
>  After remove constraints additional constraints is  List(((cast(cast(imeiCount#1586 as decimal(20,0)) as int) &gt; 10000) || (model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)) &amp;&amp; model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))))
>  left constraints is List(isnotnull(model#2486), isnotnull(device#11), isnotnull(date#30), ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) &gt; 10000) || model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))), ((((((((((((((((((((((((((((((20190107 &lt;=&gt; date#30) || (20190106 &lt;=&gt; date#30)) || (20190105 &lt;=&gt; date#30)) || (20190104 &lt;=&gt; date#30)) || (20190103 &lt;=&gt; date#30)) || (20190102 &lt;=&gt; date#30)) || (20190101 &lt;=&gt; date#30)) || (20181231 &lt;=&gt; date#30)) || (20181230 &lt;=&gt; date#30)) || (20181229 &lt;=&gt; date#30)) || (20181228 &lt;=&gt; date#30)) || (20181227 &lt;=&gt; date#30)) || (20181226 &lt;=&gt; date#30)) || (20181225 &lt;=&gt; date#30)) || (20181224 &lt;=&gt; date#30)) || (20181223 &lt;=&gt; date#30)) || (20181222 &lt;=&gt; date#30)) || (20181221 &lt;=&gt; date#30)) || (20181220 &lt;=&gt; date#30)) || (20181219 &lt;=&gt; date#30)) || (20181218 &lt;=&gt; date#30)) || (20181217 &lt;=&gt; date#30)) || (20181216 &lt;=&gt; date#30)) || (20181215 &lt;=&gt; date#30)) || (20181214 &lt;=&gt; date#30)) || (20181213 &lt;=&gt; date#30)) || (20181212 &lt;=&gt; date#30)) || (20181211 &lt;=&gt; date#30)) || (20181210 &lt;=&gt; date#30)) || (20181209 &lt;=&gt; date#30)), isnotnull(version#12))
>  right constraints is List(isnotnull(enddate#2361), isnotnull(startdate#2360), isnotnull(model#2358), isnotnull(version#2359))
> //failed app's log
> 2019-01-08,16:55:11,614 INFO org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints:
>  After remove constraints additional constraints is  List()
>  left constraints is List(isnotnull(date#30), ((((((((((((((((((((((((((((((20190107 &lt;=&gt; date#30) || (20190106 &lt;=&gt; date#30)) || (20190105 &lt;=&gt; date#30)) || (20190104 &lt;=&gt; date#30)) || (20190103 &lt;=&gt; date#30)) || (20190102 &lt;=&gt; date#30)) || (20190101 &lt;=&gt; date#30)) || (20181231 &lt;=&gt; date#30)) || (20181230 &lt;=&gt; date#30)) || (20181229 &lt;=&gt; date#30)) || (20181228 &lt;=&gt; date#30)) || (20181227 &lt;=&gt; date#30)) || (20181226 &lt;=&gt; date#30)) || (20181225 &lt;=&gt; date#30)) || (20181224 &lt;=&gt; date#30)) || (20181223 &lt;=&gt; date#30)) || (20181222 &lt;=&gt; date#30)) || (20181221 &lt;=&gt; date#30)) || (20181220 &lt;=&gt; date#30)) || (20181219 &lt;=&gt; date#30)) || (20181218 &lt;=&gt; date#30)) || (20181217 &lt;=&gt; date#30)) || (20181216 &lt;=&gt; date#30)) || (20181215 &lt;=&gt; date#30)) || (20181214 &lt;=&gt; date#30)) || (20181213 &lt;=&gt; date#30)) || (20181212 &lt;=&gt; date#30)) || (20181211 &lt;=&gt; date#30)) || (20181210 &lt;=&gt; date#30)) || (20181209 &lt;=&gt; date#30)), isnotnull(version#12), isnotnull(device#11), isnotnull(model#2486), ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) &gt; 10000) || model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))), ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) &gt; 10000) || (model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)) &amp;&amp; model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))))
>  right constraints is List(isnotnull(enddate#2361), isnotnull(startdate#2360), isnotnull(version#2359), isnotnull(model#2358))
> {code}
> Failed app plan's left child has an additional constraint:
> {code:java}
> ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) &gt; 10000) || (model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)) &amp;&amp; model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))))
> {code}
> 5. Soon after the gap between two app's plan is getting bigger, one will successful, another hang. It seems there are two possibly reason :
>  1. BooleanSimplification is not idempotent
>  2. InferFiltersFromConstraints's behavior not correct when child's constraints has A || ( B && C) instead of (A || B) && (A || C)
> I'm not sure which is root casue, could someone follow this question?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org