You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Chen Fan (JIRA)" <ji...@apache.org> on 2019/01/25 02:15:00 UTC
[jira] [Resolved] (SPARK-26569) Fixed point for batch Operator
Optimizations never reached when optimize logicalPlan
[ https://issues.apache.org/jira/browse/SPARK-26569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chen Fan resolved SPARK-26569.
------------------------------
Resolution: Duplicate
> Fixed point for batch Operator Optimizations never reached when optimize logicalPlan
> ------------------------------------------------------------------------------------
>
> Key: SPARK-26569
> URL: https://issues.apache.org/jira/browse/SPARK-26569
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.1.0
> Environment:
>
> Reporter: Chen Fan
> Priority: Major
>
> There is a bit complicated Spark App using DataSet api run once a day, and I noticed the app will hang once in a while,
> I add some log and compare two driver log which one belong to successful app, another belong to faied, and here is some results of investigation
> 1. Usually the app will running correctly, but sometime it will hang after finishing job 1
> !image-2019-01-08-19-53-20-509.png!
> 2. According to log I append , the successful app always reach the fixed point when iteration is 7 on Batch Operator Optimizations, but failed app never reached this fixed point.
> {code:java}
> 2019-01-04,11:35:34,199 DEBUG org.apache.spark.sql.execution.SparkOptimizer:
> === Result of Batch Operator Optimizations ===
> 2019-01-04,14:00:42,847 INFO org.apache.spark.sql.execution.SparkOptimizer: iteration is 6/100, for batch Operator Optimizations
> 2019-01-04,14:00:42,851 INFO org.apache.spark.sql.execution.SparkOptimizer:
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin ===
>
> 2019-01-04,14:00:42,852 INFO org.apache.spark.sql.execution.SparkOptimizer:
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicate ===
>
> 2019-01-04,14:00:42,903 INFO org.apache.spark.sql.execution.SparkOptimizer:
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
>
> 2019-01-04,14:00:42,939 INFO org.apache.spark.sql.execution.SparkOptimizer:
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.BooleanSimplification ===
>
> 2019-01-04,14:00:42,951 INFO org.apache.spark.sql.execution.SparkOptimizer:
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.PruneFilters ===
>
> 2019-01-04,14:00:42,970 INFO org.apache.spark.sql.execution.SparkOptimizer: iteration is 7/100, for batch Operator Optimizations
> 2019-01-04,14:00:42,971 INFO org.apache.spark.sql.execution.SparkOptimizer: Fixed point reached for batch Operator Optimizations after 7 iterations.
> 2019-01-04,14:13:15,616 INFO org.apache.spark.sql.execution.SparkOptimizer: iteration is 45/100, for batch Operator Optimizations
> 2019-01-04,14:13:15,619 INFO org.apache.spark.sql.execution.SparkOptimizer:
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin ===
>
> 2019-01-04,14:13:15,620 INFO org.apache.spark.sql.execution.SparkOptimizer:
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicate ===
>
> 2019-01-04,14:13:59,529 INFO org.apache.spark.sql.execution.SparkOptimizer:
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
>
> 2019-01-04,14:13:59,806 INFO org.apache.spark.sql.execution.SparkOptimizer:
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.BooleanSimplification ===
>
> 2019-01-04,14:13:59,845 INFO org.apache.spark.sql.execution.SparkOptimizer: iteration is 46/100, for batch Operator Optimizations
> 2019-01-04,14:13:59,849 INFO org.apache.spark.sql.execution.SparkOptimizer:
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin ===
>
> 2019-01-04,14:13:59,849 INFO org.apache.spark.sql.execution.SparkOptimizer:
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicate ===
>
> 2019-01-04,14:14:45,340 INFO org.apache.spark.sql.execution.SparkOptimizer:
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
>
> 2019-01-04,14:14:45,631 INFO org.apache.spark.sql.execution.SparkOptimizer:
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.BooleanSimplification ===
>
> 2019-01-04,14:14:45,678 INFO org.apache.spark.sql.execution.SparkOptimizer: iteration is 47/100, for batch Operator Optimizations
> {code}
> 3. The difference between two logical plan appear in BooleanSimplification on iteration, before this rule, two logical plan is same:
> {code:java}
> // just a head part of plan
> Project [model#2486, version#12, device#11, date#30, imei#13, pType#14, releaseType#15, regionType#16, expType#17, tag#18, imeiCount#1586, startdate#2360, enddate#2361, status#2362]
> +- Join Inner, (((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || (model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)) && model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))) && ((((((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || (model#2358 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)) && model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))) && (date#30 >= startdate#2360)) && (date#30 <= enddate#2361)) && (model#2486 = model#2358)) && (version#12 = version#2359)))
> :- Project [device#11, version#12, date#30, imei#13, pType#14, releaseType#15, regionType#16, expType#17, tag#18, imeiCount#1586, UDF(model#10, device#11) AS model#2486]
> : +- Filter ((((((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || UDF(model#10, device#1584) INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))) && ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || UDF(model#1583, device#11) INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))) && isnotnull(UDF(model#1583, device#11))) && isnotnull(UDF(model#10, device#1584))) && (((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || UDF(model#10, device#11) INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))) && isnotnull(UDF(model#10, device#11))))
> : +- Join Inner, ((((model#10 = model#1583) && (device#11 = device#1584)) && (version#12 = version#1585)) && (date#30 = date#1592))
> {code}
> 4. after BooleanSimplification, There is only one difference: Filter's constraints format like (A || B) && (A || C) on successful app but A && (B || C) on failed app.
> {code:java}
> //successful app's logical plan
> Filter ((((isnotnull(UDF(model#1583, device#11)) && ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || UDF(model#1583, device#11) INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))) && isnotnull(UDF(model#10, device#1584))) && ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || UDF(model#10, device#1584) INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))) && (((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || UDF(model#10, device#11) INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))) && isnotnull(UDF(model#10, device#11))))
> // failed app's plan
> Filter (((((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || (UDF(model#10, device#1584) INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)) && UDF(model#1583, device#11) INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))) && isnotnull(UDF(model#1583, device#11))) && isnotnull(UDF(model#10, device#1584))) && (((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || UDF(model#10, device#11) INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))) && isnotnull(UDF(model#10, device#11))))
> {code}
> 5. hereafter, error occurred with rule InferFiltersFromConstraints:
> {code:java}
> // code about upon log
> case join @ Join(left, right, joinType, conditionOpt) =>
> // Only consider constraints that can be pushed down completely to either the left or the
> // right child
> val constraints = join.constraints.filter { c =>
> c.references.subsetOf(left.outputSet) || c.references.subsetOf(right.outputSet)
> }
> // Remove those constraints that are already enforced by either the left or the right child
> val additionalConstraints = constraints -- (left.constraints ++ right.constraints)
> logInfo(
> s"""
> | After remove constraints additional constraints is ${additionalConstraints.toList.toString}
> | left constraints is ${left.constraints.toList.toString()}
> | right constraints is ${right.constraints.toList.toString()}
> """.stripMargin)
> {code}
> {code:java}
> // successful app's log
> 2019-01-08,16:44:48,911 INFO org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints:
> After remove constraints additional constraints is List(((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || (model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)) && model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))))
> left constraints is List(isnotnull(model#2486), isnotnull(device#11), isnotnull(date#30), ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))), ((((((((((((((((((((((((((((((20190107 <=> date#30) || (20190106 <=> date#30)) || (20190105 <=> date#30)) || (20190104 <=> date#30)) || (20190103 <=> date#30)) || (20190102 <=> date#30)) || (20190101 <=> date#30)) || (20181231 <=> date#30)) || (20181230 <=> date#30)) || (20181229 <=> date#30)) || (20181228 <=> date#30)) || (20181227 <=> date#30)) || (20181226 <=> date#30)) || (20181225 <=> date#30)) || (20181224 <=> date#30)) || (20181223 <=> date#30)) || (20181222 <=> date#30)) || (20181221 <=> date#30)) || (20181220 <=> date#30)) || (20181219 <=> date#30)) || (20181218 <=> date#30)) || (20181217 <=> date#30)) || (20181216 <=> date#30)) || (20181215 <=> date#30)) || (20181214 <=> date#30)) || (20181213 <=> date#30)) || (20181212 <=> date#30)) || (20181211 <=> date#30)) || (20181210 <=> date#30)) || (20181209 <=> date#30)), isnotnull(version#12))
> right constraints is List(isnotnull(enddate#2361), isnotnull(startdate#2360), isnotnull(model#2358), isnotnull(version#2359))
> //failed app's log
> 2019-01-08,16:55:11,614 INFO org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints:
> After remove constraints additional constraints is List()
> left constraints is List(isnotnull(date#30), ((((((((((((((((((((((((((((((20190107 <=> date#30) || (20190106 <=> date#30)) || (20190105 <=> date#30)) || (20190104 <=> date#30)) || (20190103 <=> date#30)) || (20190102 <=> date#30)) || (20190101 <=> date#30)) || (20181231 <=> date#30)) || (20181230 <=> date#30)) || (20181229 <=> date#30)) || (20181228 <=> date#30)) || (20181227 <=> date#30)) || (20181226 <=> date#30)) || (20181225 <=> date#30)) || (20181224 <=> date#30)) || (20181223 <=> date#30)) || (20181222 <=> date#30)) || (20181221 <=> date#30)) || (20181220 <=> date#30)) || (20181219 <=> date#30)) || (20181218 <=> date#30)) || (20181217 <=> date#30)) || (20181216 <=> date#30)) || (20181215 <=> date#30)) || (20181214 <=> date#30)) || (20181213 <=> date#30)) || (20181212 <=> date#30)) || (20181211 <=> date#30)) || (20181210 <=> date#30)) || (20181209 <=> date#30)), isnotnull(version#12), isnotnull(device#11), isnotnull(model#2486), ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))), ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || (model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)) && model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))))
> right constraints is List(isnotnull(enddate#2361), isnotnull(startdate#2360), isnotnull(version#2359), isnotnull(model#2358))
> {code}
> Failed app plan's left child has an additional constraint:
> {code:java}
> ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || (model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)) && model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))))
> {code}
> 5. Soon after the gap between two app's plan is getting bigger, one will successful, another hang. It seems there are two possibly reason :
> 1. BooleanSimplification is not idempotent
> 2. InferFiltersFromConstraints's behavior not correct when child's constraints has A || ( B && C) instead of (A || B) && (A || C)
> I'm not sure which is root casue, could someone follow this question?
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org