You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Asif (Jira)" <ji...@apache.org> on 2020/10/14 21:54:00 UTC

[jira] [Commented] (SPARK-33152) Constraint Propagation code causes OOM issues or increasing compilation time to hours

    [ https://issues.apache.org/jira/browse/SPARK-33152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17214298#comment-17214298 ] 

Asif commented on SPARK-33152:
------------------------------

I will be generating a PR for the same..

> Constraint Propagation code causes OOM issues or increasing compilation time to hours
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-33152
>                 URL: https://issues.apache.org/jira/browse/SPARK-33152
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.0
>            Reporter: Asif
>            Priority: Major
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> We encountered this issue at Workday. 
> The issue is that current Constraints Propagation code pessimistically generates all the possible permutations of base constraint for the aliases in the project node.
> This causes blow up of the number of constraints generated causing OOM issues at compile time of sql query, or queries taking 18 min to 2 hrs to compile.
> The problematic piece of code is in LogicalPlan.getAliasedConstraints
> projectList.foreach {
>  case a @ Alias(l: Literal, _) =>
>  allConstraints += EqualNullSafe(a.toAttribute, l)
>  case a @ Alias(e, _) =>
>  // For every alias in `projectList`,replace the reference in
>  // constraints by its attribute.
>  allConstraints ++= allConstraints.map(_ transform {
>  case expr: Expression if expr.semanticEquals(e) =>
>  a.toAttribute
>  })
>  allConstraints += EqualNullSafe(e, a.toAttribute)
>  case _ => // Don't change.
>  }
> so consider a hypothetical plan
>  
> Project (a, a as a1, a. as a2, a as a3, b, b as b1, b as b2, c, c as c1, c as c2 , c as c3)
>    |
> Filter f(a, b, c)
> |
> Base Relation (a, b, c)
> and so we have projection as
> a, a1, a2, a3
> b, b1, b2
> c, c1, c2, c3
> Lets say hypothetically f(a, b, c) has a occurring 1 times, b occurring 2 times, and C occurring 3 times.
> So at project node the number of constraints for a single base constraint f(a, b, c) will be
> 4C1 * 3C2 * 4C3 = 48
> In our case, we have seen number of constraints going up to > 30000 or more, as there are complex case statements in the projection.
> Spark generates all these constraints pessimistically for pruning filters or push down predicates for join , it may encounter when the optimizer traverses up the tree.
>  
> This is issue is solved at our end by modifying the spark code to use a different logic.
> The idea is simple. 
> Instead of generating pessimistically all possible combinations of base constraint, just store the original base constraints & track the aliases at each level.
> The principal followed is this:
> 1) Store the base constraint and keep the track of the aliases for the underlying attribute.
> 2) If the base attribute composing the constraint is not in the output set, see if the constraint survives by substituting the attribute getting removed with the next available alias's attribute.
>  
> For checking if a filter can be pruned , just canonicalize the filter with the attribute at 0th position of the tracking list & compare with the underlying base constraint.
> To elaborate using  the plan above.
> At project node
> We have constraint f(a,b,c)
> we keep track of alias
> List 1  : a, a1.attribute, a2.attribute, a3.attribute
> List2 :  b, b1.attribute, b2.attribute 
> List3: c, c1.attribute, c2.attribute, c3.attribute
> Lets say above the project node, we encounter a filter
> f(a1, b2, c3)
> So canonicalize the filter by using the above list data, to convert it to 
> f(a,b c) & compare it with the stored base constraints.
>  
> For predicate push down , instead of generating all the redundant combinations of constraints , just generate one constraint per element of the alias.
> In the current spark code , in any case, filter push down happens only for 1 variable at a time.
> So just expanding the filter (a,b,c) to
> f(a, b, c), f(a1, b, c), f(a2, b, c), f(a3, , b ,c), f (a, b1, c), f(a, b2, c) , f(a, b, c1), f(a, b, c2), f(a, b, c3) 
> would suffice, rather than generating all the redundant combinations.
> In fact the code can be easily modified to generate only those constraints which involve variables forming the join condition. so the number of  constraints generated on expand are further reduced.
> We already have code to generate compound filters for push down ( join on multiple conditions), which can be used for single variable condition, push down too.
> Just to elaborate the logic further, if we consider the above hypothetical plan (assume collapse project rule is not there)
>  
> Project (a1, a1. as a4, b,  c1, c1 as c4)
>   |
> Project (a, a as a1, a. as a2, a as a3, b, b as b1, b as b2, c, c as c1, c as c2 , c as c3)
>    |
> Filter f(a, b, c)
> |
> Base Relation (a, b, c)
>  
> So at project node2, the constraints data will become
> we keep track of alias
> List 1  : a1, a4.attribute
> List2 :  b
> List3: c1, c4.attribute
> And the constraint f(a, b, c) is modified to f(a1, b, c1)
>  
> The above logic is also more effective in filter pruning then the current code as some of our tests show. Besides minimal over head of constraint propagation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org