You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Konstantin Knauf (Jira)" <ji...@apache.org> on 2020/10/01 07:11:00 UTC

[jira] [Commented] (FLINK-18988) Continuous query with LATERAL and LIMIT produces wrong result

    [ https://issues.apache.org/jira/browse/FLINK-18988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17205314#comment-17205314 ] 

Konstantin Knauf commented on FLINK-18988:
------------------------------------------

[~danny0405] Thank you for working on this issue. Are you also planning to fix this for Flink 1.11 or only master?

> Continuous query with LATERAL and LIMIT produces wrong result
> -------------------------------------------------------------
>
>                 Key: FLINK-18988
>                 URL: https://issues.apache.org/jira/browse/FLINK-18988
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.11.1
>            Reporter: Fabian Hueske
>            Assignee: Danny Chen
>            Priority: Critical
>              Labels: pull-request-available
>
> I was trying out the example queries provided in this blog post: [https://materialize.io/lateral-joins-and-demand-driven-queries/] to check if Flink supports the same and found that the queries were translated and executed but produced the wrong result.
> I used the SQL Client and Kafka (running at kafka:9092) to store the table data. I executed the following statements:
> {code:java}
> -- create cities table
> CREATE TABLE cities (
>   name STRING NOT NULL,
>   state STRING NOT NULL,
>   pop INT NOT NULL
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'cities',
>   'properties.bootstrap.servers' = 'kafka:9092',
>   'properties.group.id' = 'mygroup', 
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'json'
> );
> -- fill cities table
> INSERT INTO cities VALUES
>   ('Los_Angeles', 'CA', 3979576),
>   ('Phoenix', 'AZ', 1680992),
>   ('Houston', 'TX', 2320268),
>   ('San_Diego', 'CA', 1423851),
>   ('San_Francisco', 'CA', 881549),
>   ('New_York', 'NY', 8336817),
>   ('Dallas', 'TX', 1343573),
>   ('San_Antonio', 'TX', 1547253),
>   ('San_Jose', 'CA', 1021795),
>   ('Chicago', 'IL', 2695598),
>   ('Austin', 'TX', 978908);
> -- execute query
> SELECT state, name 
> FROM
>   (SELECT DISTINCT state FROM cities) states,
>   LATERAL (
>     SELECT name, pop
>     FROM cities
>     WHERE state = states.state
>     ORDER BY pop
>     DESC LIMIT 3
>   );
> -- result
> state                      name
>    CA               Los_Angeles
>    NY                  New_York
>    IL                   Chicago
> -- expected result
> state | name
> ------+-------------
> TX    | Dallas
> AZ    | Phoenix
> IL    | Chicago
> TX    | Houston
> CA    | San_Jose
> NY    | New_York
> CA    | San_Diego
> CA    | Los_Angeles
> TX    | San_Antonio
> {code}
> As you can see from the query result, Flink computes the top3 cities over all states, not for every state individually. Hence, I assume that this is a bug in the query optimizer or one of the rewriting rules.
> There are two valid ways to solve this issue:
>  * Fixing the rewriting rules / optimizer (obviously preferred)
>  * Disabling this feature and throwing an exception



--
This message was sent by Atlassian Jira
(v8.3.4#803005)