You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@calcite.apache.org by Haisheng Yuan <h....@alibaba-inc.com> on 2019/03/09 04:36:40 UTC
Re: Two problems recently reported in Flink
Hi Fabian,
You mentioned:
> > > Another problem are join predicates in the ON clause that reference the>
> > > inner and outer table.
I don’t figure out why this is a problem. Even the join predicate references both inner and outer relation,
pushing the filter down to inner side still can work, with correlatoin variable.
I have opened a pull request to finish the first task below to push down filter of lateral join, and it should
be able to resolve this issue: https://github.com/apache/calcite/pull/1096
Thanks,
Haisheng Yuan
On 2017/10/11 09:07:56, Fabian Hueske <f....@gmail.com> wrote:
> I think that makes a lot of sense.>
> Adding a predicate to the Correlate operator seems to be a big effort.>
>
> The OUTER APPLY syntax does not offer an ON clause. So we would not need a>
> predicate in an Apply operator if we only want to support this syntax.>
> However, if we want to translate OUTER LATERAL JOINs against table>
> functions (or SelectMany) we would need that.>
> Moreover, the motivation to add an Apply operator is to have a predicate in>
> the operator.>
>
> Breaking this down into separate tasks:>
>
> 1. Translate local predicates of OUTER LATERAL JOIN to a filter on the>
> lateral table.>
> 2. Disable OUTER LATERAL JOIN with join predicates. These cannot be>
> correctly translated without adding a predicate to the Correlate operator.>
> 3. Add CrossApply and OuterApply operators with join predicate, translate>
> APPLY syntax to Apply operators, and add translation rules.>
> 4. Translate LATERAL JOIN against a table function to APPLY operators. This>
> enables OUTER LATERAL JOIN against a table function with join predicates.>
>
> If you agree, I'd create the respective JIRAs.>
>
> Best, Fabian>
>
> 2017-10-10 21:20 GMT+02:00 Julian Hyde <jh...@apache.org>:>
>
> > I can’t think of a good solution.>
> >>
> > Consider what is involved to (properly) add a predicate to the Correlate>
> > operator. You would need to modify all the rules that involve Correlate,>
> > and add new rules to push predicates into, and through, that filter, and>
> > add tests for all of this. Rewrite rules will use the new form of Correlate>
> > automatically for “ordinary” queries, and if you haven’t tested it>
> > extensively, those queries will break.>
> >>
> > I don’t think adding a filter to Correlate is a bad idea. It’s just a lot>
> > of work.>
> >>
> > Also, Correlate isn’t an easy operator to implement efficiently. The>
> > obvious implementation requires the right-hand side of the tree to be>
> > restarted — expensive if the query is distributed, and impossible if it is>
> > streaming. This is why we take such great pains to de-correlate queries.>
> >>
> > I wonder whether it would be better to implement APPLY[1] (and its>
> > variants CROSS APPLY and LEFT APPLY) as a first-class relational operator.>
> > It captures the fact that the right-hand side is a leaf node (a call to a>
> > function). And would also apply UNNEST when the function returns several>
> > rows. In other words, it is like selectMany(), described as “LINQ’s most>
> > powerful operator”[2] because it can implement Project, Filter, and more>
> > besides. As a single-input operator, it is easier to reason about.>
> >>
> > Consider the query to find all employees whose age is greater than their>
> > department number, using a function to generate the employees, and printing>
> > each department at least once even if it has no employees old enough.>
> >>
> > SELECT d.deptno, d.name, e.name>
> > FROM dept AS d JOIN LATERAL TABLE(Employees(d.deptno)) AS e ON e.age >>
> > d.deptno>
> >>
> > becomes>
> >>
> > Apply(let e = Employees($0) in if empty(e) then {($0, $1, null)} else>
> > map(e, e -> ($0, $1, e.name)) end end)>
> > Scan(dept)>
> >>
> > Julian>
> >>
> > [1] https://issues.apache.org/jira/browse/CALCITE-1472 <>
> > https://issues.apache.org/jira/browse/CALCITE-1472>>
> >>
> > [2] https://dzone.com/articles/selectmany-probably-the-most-p <>
> > https://dzone.com/articles/selectmany-probably-the-most-p>>
> >>
> > > On Oct 10, 2017, at 7:56 AM, Fabian Hueske <fh...@gmail.com> wrote:>
> > >>
> > > Thanks Julian,>
> > > Xingcan created CALCITE-2004 to track the issue.>
> > >>
> > > I think applying a local predicate that is defined in the ON clause on>
> > the>
> > > inner input before the join solves one part of the issue.>
> > > Another problem are join predicates in the ON clause that reference the>
> > > inner and outer table. These would need to be applied by the correlate>
> > > operator to ensure that outer rows are preserved.>
> > >>
> > > We might need to add a predicate to the Correlate operator or do you have>
> > > another idea?>
> > >>
> > > Thanks, Fabian>
> > >>
> > >>
> > > 2017-10-07 20:53 GMT+02:00 Julian Hyde <jh...@apache.org>:>
> > >>
> > >> You are correct. Applying the Filter after the Correlate doesn’t give>
> > the>
> > >> right behavior. Can you log a bug?>
> > >>>
> > >> I’d rather not add filters to the Correlate operator unless absolutely>
> > >> necessary. In this case, is it sufficient to apply the Filter to the>
> > right>
> > >> input of the Correlate?>
> > >>>
> > >> Julian>
> > >>>
> > >>>
> > >>> On Oct 4, 2017, at 2:20 PM, Fabian Hueske <fh...@gmail.com> wrote:>
> > >>>>
> > >>> Thanks for the quick reply, Julian.>
> > >>>>
> > >>> It's great that the first issue is already fixed in 1.14!>
> > >>>>
> > >>> Regarding the second issue.>
> > >>> The problem with the logical plan that Xingcan posted is not the>
> > handling>
> > >>> of outer rows for which the right side is empty.>
> > >>> Our concern is that the outer join predicate is pushed into a>
> > >> LogicalFilter>
> > >>> and not kept together with the left join. Shouldn't an outer join>
> > >> predicate>
> > >>> be evaluated by the join itself and not in a subsequent filter?>
> > >>> Otherwise an outer row might be completely filtered out (instead of>
> > being>
> > >>> padded with null) if the join predicate filter filters out all join>
> > >> results>
> > >>> produced by a correlate join for the outer row.>
> > >>>>
> > >>> Of course it is possible to add a rule that converts LogicalCorrelate>
> > ->>
> > >>> LogicalFilter(joinPredicate) into FlinkCorrelate(joinPredicate).>
> > >>> However, I think it would be good to directly support join predicates>
> > in>
> > >>> LogicalCorrelate because this should be an issue for all systems that>
> > >>> support outer joins with table functions.>
> > >>>>
> > >>> Does that make sense?>
> > >>>>
> > >>> Thanks, Fabian>
> > >>>>
> > >>> 2017-10-04 22:24 GMT+02:00 Julian Hyde <jh...@apache.org>:>
> > >>>>
> > >>>> Many thanks for raising these issues. See comments line.>
> > >>>>>
> > >>>>> 1. Using NULL literal causes NPE.>
> > >>>>> It seems that the constant NULL in Calcite is represented as a>
> > >> RexLiteral>
> > >>>>> with a (null: Comparable) value. In RexUtil.gatherConstraint(), the>
> > >>>>> equals() method is invoked by the value returned by (NULL:>
> > RexLiteral),>
> > >>>>> which is null, and that causes the NPE.>
> > >>>>>
> > >>>> The latest RexUtil.gatherConstraint does not call>
> > Object.equals(Object);>
> > >>>> it calls Objects.equals(Object, Object). See>
> > https://issues.apache.org/>
> > >>>> jira/browse/CALCITE-1860 <https://issues.apache.org/>
> > >>>> jira/browse/CALCITE-1860>, which is fixed in 1.14 (which will be>
> > >> released>
> > >>>> in the next day or two).>
> > >>>>>
> > >>>>>>
> > >>>>> 2. The TableFunction left outer join works incorrectly.>
> > >>>>> For instance, given a simple table {WordCount(word:String,>
> > >>>> frequency:Int)},>
> > >>>>> a table function {split: word:String => (letter:String,>
> > >> length:String)},>
> > >>>>> and a query "SELECT word, letter, length FROM WordCount LEFT JOIN>
> > >> LATERAL>
> > >>>>> TABLE(split(word)) AS T (letter, length) ON frequency = length OR>
> > >> length>
> > >>>> <>
> > >>>>> 5", the query will be translated to the logical plan below.>
> > >>>>>>
> > >>>>> LogicalProject(word=[$0], name=[$2], length=[$3])>
> > >>>>> LogicalFilter(condition=[OR(=($1, CAST($3):BIGINT), <($3, 5))])>
> > >>>>> LogicalCorrelate(correlation=[$cor0], joinType=[left],>
> > >>>>> requiredColumns=[{0}])>
> > >>>>> LogicalTableScan(table=[[WordCount]])>
> > >>>>> LogicalTableFunctionScan(invocation=[split($cor0.word)],>
> > >>>>> rowType=[RecordType(VARCHAR(65536) _1, INTEGER _2)],>
> > >> elementType=[class>
> > >>>>> [Ljava.lang.Object;])>
> > >>>>>>
> > >>>>> This logical plan may lead to an improper physical plan, which first>
> > >>>>> correlates each row with its table function results (just like>
> > >>>> performing a>
> > >>>>> cartesian product) and then filters the rows. IMO, it only works for>
> > >>>> inner>
> > >>>>> join, but not for left outer join.>
> > >>>>>
> > >>>> Note that LogicalCorrelate has joinType=left. This should cause a NULL>
> > >>>> record to be emitted on the right if no records are generated, which>
> > is>
> > >> the>
> > >>>> right behavior.>
> > >>>>>
> > >>>> Now, maybe that logical plan is being incorrectly converted to a>
> > >> physical>
> > >>>> plan. If so please log a JIRA case.>
> > >>>>>
> > >>>> Julian>
> > >>>>>
> > >>>>>
> > >>>
> > >>>
> >>
> >>
>