You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Konstantin Knauf (Jira)" <ji...@apache.org> on 2020/10/01 07:11:00 UTC
[jira] [Commented] (FLINK-18988) Continuous query with LATERAL and
LIMIT produces wrong result
[ https://issues.apache.org/jira/browse/FLINK-18988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17205314#comment-17205314 ]
Konstantin Knauf commented on FLINK-18988:
------------------------------------------
[~danny0405] Thank you for working on this issue. Are you also planning to fix this for Flink 1.11 or only master?
> Continuous query with LATERAL and LIMIT produces wrong result
> -------------------------------------------------------------
>
> Key: FLINK-18988
> URL: https://issues.apache.org/jira/browse/FLINK-18988
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.11.1
> Reporter: Fabian Hueske
> Assignee: Danny Chen
> Priority: Critical
> Labels: pull-request-available
>
> I was trying out the example queries provided in this blog post: [https://materialize.io/lateral-joins-and-demand-driven-queries/] to check if Flink supports the same and found that the queries were translated and executed but produced the wrong result.
> I used the SQL Client and Kafka (running at kafka:9092) to store the table data. I executed the following statements:
> {code:java}
> -- create cities table
> CREATE TABLE cities (
> name STRING NOT NULL,
> state STRING NOT NULL,
> pop INT NOT NULL
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'cities',
> 'properties.bootstrap.servers' = 'kafka:9092',
> 'properties.group.id' = 'mygroup',
> 'scan.startup.mode' = 'earliest-offset',
> 'format' = 'json'
> );
> -- fill cities table
> INSERT INTO cities VALUES
> ('Los_Angeles', 'CA', 3979576),
> ('Phoenix', 'AZ', 1680992),
> ('Houston', 'TX', 2320268),
> ('San_Diego', 'CA', 1423851),
> ('San_Francisco', 'CA', 881549),
> ('New_York', 'NY', 8336817),
> ('Dallas', 'TX', 1343573),
> ('San_Antonio', 'TX', 1547253),
> ('San_Jose', 'CA', 1021795),
> ('Chicago', 'IL', 2695598),
> ('Austin', 'TX', 978908);
> -- execute query
> SELECT state, name
> FROM
> (SELECT DISTINCT state FROM cities) states,
> LATERAL (
> SELECT name, pop
> FROM cities
> WHERE state = states.state
> ORDER BY pop
> DESC LIMIT 3
> );
> -- result
> state name
> CA Los_Angeles
> NY New_York
> IL Chicago
> -- expected result
> state | name
> ------+-------------
> TX | Dallas
> AZ | Phoenix
> IL | Chicago
> TX | Houston
> CA | San_Jose
> NY | New_York
> CA | San_Diego
> CA | Los_Angeles
> TX | San_Antonio
> {code}
> As you can see from the query result, Flink computes the top3 cities over all states, not for every state individually. Hence, I assume that this is a bug in the query optimizer or one of the rewriting rules.
> There are two valid ways to solve this issue:
> * Fixing the rewriting rules / optimizer (obviously preferred)
> * Disabling this feature and throwing an exception
--
This message was sent by Atlassian Jira
(v8.3.4#803005)