You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@calcite.apache.org by Haisheng Yuan <h....@alibaba-inc.com> on 2019/03/09 04:36:40 UTC

Re: Two problems recently reported in Flink

Hi Fabian,
You mentioned:
> > > Another problem are join predicates in the ON clause that reference the> 
> > > inner and outer table.

I don’t figure out why this is a problem. Even the join predicate references both inner and outer relation,
pushing the filter down to inner side still can work, with correlatoin variable.

I have opened a pull request to finish the first task below to push down filter of lateral join, and it should
 be able to resolve this issue: https://github.com/apache/calcite/pull/1096

Thanks,
Haisheng Yuan


On 2017/10/11 09:07:56, Fabian Hueske <f....@gmail.com> wrote: 
> I think that makes a lot of sense.> 
> Adding a predicate to the Correlate operator seems to be a big effort.> 
> 
> The OUTER APPLY syntax does not offer an ON clause. So we would not need a> 
> predicate in an Apply operator if we only want to support this syntax.> 
> However, if we want to translate OUTER LATERAL JOINs against table> 
> functions (or SelectMany) we would need that.> 
> Moreover, the motivation to add an Apply operator is to have a predicate in> 
> the operator.> 
> 
> Breaking this down into separate tasks:> 
> 
> 1. Translate local predicates of OUTER LATERAL JOIN to a filter on the> 
> lateral table.> 
> 2. Disable OUTER LATERAL JOIN with join predicates. These cannot be> 
> correctly translated without adding a predicate to the Correlate operator.> 
> 3. Add CrossApply and OuterApply operators with join predicate, translate> 
> APPLY syntax to Apply operators, and add translation rules.> 
> 4. Translate LATERAL JOIN against a table function to APPLY operators. This> 
> enables OUTER LATERAL JOIN against a table function with join predicates.> 
> 
> If you agree, I'd create the respective JIRAs.> 
> 
> Best, Fabian> 
> 
> 2017-10-10 21:20 GMT+02:00 Julian Hyde <jh...@apache.org>:> 
> 
> > I can’t think of a good solution.> 
> >> 
> > Consider what is involved to (properly) add a predicate to the Correlate> 
> > operator. You would need to modify all the rules that involve Correlate,> 
> > and add new rules to push predicates into, and through, that filter, and> 
> > add tests for all of this. Rewrite rules will use the new form of Correlate> 
> > automatically for “ordinary” queries, and if you haven’t tested it> 
> > extensively, those queries will break.> 
> >> 
> > I don’t think adding a filter to Correlate is a bad idea. It’s just a lot> 
> > of work.> 
> >> 
> > Also, Correlate isn’t an easy operator to implement efficiently. The> 
> > obvious implementation requires the right-hand side of the tree to be> 
> > restarted — expensive if the query is distributed, and impossible if it is> 
> > streaming. This is why we take such great pains to de-correlate queries.> 
> >> 
> > I wonder whether it would be better to implement APPLY[1] (and its> 
> > variants CROSS APPLY and LEFT APPLY) as a first-class relational operator.> 
> > It captures the fact that the right-hand side is a leaf node (a call to a> 
> > function). And would also apply UNNEST when the function returns several> 
> > rows. In other words, it is like selectMany(), described as “LINQ’s most> 
> > powerful operator”[2] because it can implement Project, Filter, and more> 
> > besides. As a single-input operator, it is easier to reason about.> 
> >> 
> > Consider the query to find all employees whose age is greater than their> 
> > department number, using a function to generate the employees, and printing> 
> > each department at least once even if it has no employees old enough.> 
> >> 
> >  SELECT d.deptno, d.name, e.name> 
> >  FROM dept AS d JOIN LATERAL TABLE(Employees(d.deptno)) AS e ON e.age >> 
> > d.deptno> 
> >> 
> > becomes> 
> >> 
> >  Apply(let e = Employees($0) in if empty(e) then {($0, $1, null)} else> 
> > map(e, e -> ($0, $1, e.name)) end end)> 
> >  Scan(dept)> 
> >> 
> > Julian> 
> >> 
> > [1] https://issues.apache.org/jira/browse/CALCITE-1472 <> 
> > https://issues.apache.org/jira/browse/CALCITE-1472>> 
> >> 
> > [2] https://dzone.com/articles/selectmany-probably-the-most-p <> 
> > https://dzone.com/articles/selectmany-probably-the-most-p>> 
> >> 
> > > On Oct 10, 2017, at 7:56 AM, Fabian Hueske <fh...@gmail.com> wrote:> 
> > >> 
> > > Thanks Julian,> 
> > > Xingcan created CALCITE-2004 to track the issue.> 
> > >> 
> > > I think applying a local predicate that is defined in the ON clause on> 
> > the> 
> > > inner input before the join solves one part of the issue.> 
> > > Another problem are join predicates in the ON clause that reference the> 
> > > inner and outer table. These would need to be applied by the correlate> 
> > > operator to ensure that outer rows are preserved.> 
> > >> 
> > > We might need to add a predicate to the Correlate operator or do you have> 
> > > another idea?> 
> > >> 
> > > Thanks, Fabian> 
> > >> 
> > >> 
> > > 2017-10-07 20:53 GMT+02:00 Julian Hyde <jh...@apache.org>:> 
> > >> 
> > >> You are correct. Applying the Filter after the Correlate doesn’t give> 
> > the> 
> > >> right behavior. Can you log a bug?> 
> > >>> 
> > >> I’d rather not add filters to the Correlate operator unless absolutely> 
> > >> necessary. In this case, is it sufficient to apply the Filter to the> 
> > right> 
> > >> input of the Correlate?> 
> > >>> 
> > >> Julian> 
> > >>> 
> > >>> 
> > >>> On Oct 4, 2017, at 2:20 PM, Fabian Hueske <fh...@gmail.com> wrote:> 
> > >>>> 
> > >>> Thanks for the quick reply, Julian.> 
> > >>>> 
> > >>> It's great that the first issue is already fixed in 1.14!> 
> > >>>> 
> > >>> Regarding the second issue.> 
> > >>> The problem with the logical plan that Xingcan posted is not the> 
> > handling> 
> > >>> of outer rows for which the right side is empty.> 
> > >>> Our concern is that the outer join predicate is pushed into a> 
> > >> LogicalFilter> 
> > >>> and not kept together with the left join. Shouldn't an outer join> 
> > >> predicate> 
> > >>> be evaluated by the join itself and not in a subsequent filter?> 
> > >>> Otherwise an outer row might be completely filtered out (instead of> 
> > being> 
> > >>> padded with null) if the join predicate filter filters out all join> 
> > >> results> 
> > >>> produced by a correlate join for the outer row.> 
> > >>>> 
> > >>> Of course it is possible to add a rule that converts LogicalCorrelate> 
> > ->> 
> > >>> LogicalFilter(joinPredicate) into FlinkCorrelate(joinPredicate).> 
> > >>> However, I think it would be good to directly support join predicates> 
> > in> 
> > >>> LogicalCorrelate because this should be an issue for all systems that> 
> > >>> support outer joins with table functions.> 
> > >>>> 
> > >>> Does that make sense?> 
> > >>>> 
> > >>> Thanks, Fabian> 
> > >>>> 
> > >>> 2017-10-04 22:24 GMT+02:00 Julian Hyde <jh...@apache.org>:> 
> > >>>> 
> > >>>> Many thanks for raising these issues. See comments line.> 
> > >>>>> 
> > >>>>> 1. Using NULL literal causes NPE.> 
> > >>>>> It seems that the constant NULL in Calcite is represented as a> 
> > >> RexLiteral> 
> > >>>>> with a (null: Comparable) value. In RexUtil.gatherConstraint(), the> 
> > >>>>> equals() method is invoked by the value returned by (NULL:> 
> > RexLiteral),> 
> > >>>>> which is null, and that causes the NPE.> 
> > >>>>> 
> > >>>> The latest RexUtil.gatherConstraint does not call> 
> > Object.equals(Object);> 
> > >>>> it calls Objects.equals(Object, Object). See> 
> > https://issues.apache.org/> 
> > >>>> jira/browse/CALCITE-1860 <https://issues.apache.org/> 
> > >>>> jira/browse/CALCITE-1860>, which is fixed in 1.14 (which will be> 
> > >> released> 
> > >>>> in the next day or two).> 
> > >>>>> 
> > >>>>>> 
> > >>>>> 2. The TableFunction left outer join works incorrectly.> 
> > >>>>> For instance, given a simple table {WordCount(word:String,> 
> > >>>> frequency:Int)},> 
> > >>>>> a table function {split: word:String => (letter:String,> 
> > >> length:String)},> 
> > >>>>> and a query "SELECT word, letter, length FROM WordCount LEFT JOIN> 
> > >> LATERAL> 
> > >>>>> TABLE(split(word)) AS T (letter, length) ON frequency = length OR> 
> > >> length> 
> > >>>> <> 
> > >>>>> 5", the query will be translated to the logical plan below.> 
> > >>>>>> 
> > >>>>> LogicalProject(word=[$0], name=[$2], length=[$3])> 
> > >>>>> LogicalFilter(condition=[OR(=($1, CAST($3):BIGINT), <($3, 5))])> 
> > >>>>>  LogicalCorrelate(correlation=[$cor0], joinType=[left],> 
> > >>>>> requiredColumns=[{0}])> 
> > >>>>>  LogicalTableScan(table=[[WordCount]])> 
> > >>>>>  LogicalTableFunctionScan(invocation=[split($cor0.word)],> 
> > >>>>> rowType=[RecordType(VARCHAR(65536) _1, INTEGER _2)],> 
> > >> elementType=[class> 
> > >>>>> [Ljava.lang.Object;])> 
> > >>>>>> 
> > >>>>> This logical plan may lead to an improper physical plan, which first> 
> > >>>>> correlates each row with its table function results (just like> 
> > >>>> performing a> 
> > >>>>> cartesian product) and then filters the rows. IMO, it only works for> 
> > >>>> inner> 
> > >>>>> join, but not for left outer join.> 
> > >>>>> 
> > >>>> Note that LogicalCorrelate has joinType=left. This should cause a NULL> 
> > >>>> record to be emitted on the right if no records are generated, which> 
> > is> 
> > >> the> 
> > >>>> right behavior.> 
> > >>>>> 
> > >>>> Now, maybe that logical plan is being incorrectly converted to a> 
> > >> physical> 
> > >>>> plan. If so please log a JIRA case.> 
> > >>>>> 
> > >>>> Julian> 
> > >>>>> 
> > >>>>> 
> > >>> 
> > >>> 
> >> 
> >> 
>