You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2020/02/19 13:17:00 UTC

[jira] [Comment Edited] (FLINK-14533) PushFilterIntoTableSourceScanRule misses predicate pushdowns

    [ https://issues.apache.org/jira/browse/FLINK-14533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17040012#comment-17040012 ] 

Jark Wu edited comment on FLINK-14533 at 2/19/20 1:16 PM:
----------------------------------------------------------

Hi [~Yuval.Itzchakov], I didn't check-pick it back to 1.10 and 1.9, because it may change the plan. The community gives the guarantee that the SQL job state should be reused across minor releases. Hope it makes sense to you. 


was (Author: jark):
Hi [~Yuval.Itzchakov], I didn't check-pick it back to 1.10 and 1.9, because it may change the plan. The community gives the guarantee the the SQL job state can be reused across minor releases. Hope it makes sense to you. 

> PushFilterIntoTableSourceScanRule misses predicate pushdowns
> ------------------------------------------------------------
>
>                 Key: FLINK-14533
>                 URL: https://issues.apache.org/jira/browse/FLINK-14533
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.8.1, 1.8.2, 1.9.0, 1.9.1
>            Reporter: Yuval Itzchakov
>            Assignee: Yuval Itzchakov
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.11.0
>
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> When Flink attempts to perform predicate pushdown via `PushFilterIntoTableSourceScanRule`, it first checks the RexNodes to see if they can actually be pushed down to the source. It does that via `RexNodeToExpressionConverter.visitCall`, which traverses the nodes and eventually checks to see if it's a condition it knows:
> {code:scala}
> call.getOperator match { 
>   case SqlStdOperatorTable.OR => Option(operands.reduceLeft { (l, r) => Or(l.asInstanceOf[PlannerExpression], r.asInstanceOf[PlannerExpression]) }) 
>   case SqlStdOperatorTable.AND => Option(operands.reduceLeft { (l, r) => And(l.asInstanceOf[PlannerExpression], r.asInstanceOf[PlannerExpression]) }) 
>   case function: SqlFunction => lookupFunction(replace(function.getName), operands)  
>   case postfix: SqlPostfixOperator => lookupFunction(replace(postfix.getName), operands) 
>   case operator@_ => lookupFunction(replace(s"${operator.getKind}"), operands) 
> }
> {code}
> If we take as an example the following query:
> {code:sql}
> SELECT a, b, c 
> FROM d 
> WHERE LOWER(a) LIKE '%%foo%%' AND LOWER(b) LIKE '%%python%%'
> {code}
> When we hit the above pattern match, we fall to the case matching `SqlFunction`, as `LOWER` is of that type. Inside `lookupFunction`, we have a call to `functionCatalog.lookupFunction(name)` which looks up the given function in the function catalog. Eventually, we reach a static class called `BuiltInFunctionDefinitions`, which defines all of Flink's built in functions. When we iterate the list of built in functions as follows:
> {code:java}
> foundDefinition = BuiltInFunctionDefinitions.getDefinitions()
> 				.stream()
> 				.filter(f -> functionName.equals(normalizeName(f.getName())))
> 				.findFirst()
> 				.map(Function.identity());
> {code}
>  This doesn't yield a result, because `LOWER`, inside `BuiltInFunctionDefinitions`, is defined as follows:
> {code:java}
> public static final BuiltInFunctionDefinition LOWER =
> 		new BuiltInFunctionDefinition.Builder()
> 			.name("lowerCase")
> 			.kind(SCALAR)
> 			.outputTypeStrategy(TypeStrategies.MISSING)
> 			.build();
> {code}
>  
> And since we're using String to do the lookups, this fails to match, hence returning `null` and causing the entire pushdown to fail.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)