You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Xuannan Su (Jira)" <ji...@apache.org> on 2022/08/16 07:59:00 UTC

[jira] [Created] (FLINK-28988) Incorrect result for filter after temporal join

Xuannan Su created FLINK-28988:
----------------------------------

             Summary: Incorrect result for filter after temporal join
                 Key: FLINK-28988
                 URL: https://issues.apache.org/jira/browse/FLINK-28988
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
    Affects Versions: 1.15.1
            Reporter: Xuannan Su


The following code can reproduce the case

 
{code:java}
public class TemporalJoinSQLExample1 {

    public static void main(String[] args) throws Exception {

        // set up the Java DataStream API
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // set up the Java Table API
        final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        final DataStreamSource<Tuple3<Integer, String, Instant>> ds =
                env.fromElements(
                        new Tuple3<>(0, "online", Instant.ofEpochMilli(0)),
                        new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)),
                        new Tuple3<>(0, "online", Instant.ofEpochMilli(20)));

        final Table table =
                tableEnv.fromDataStream(
                                ds,
                                Schema.newBuilder()
                                        .column("f0", DataTypes.INT())
                                        .column("f1", DataTypes.STRING())
                                        .column("f2", DataTypes.TIMESTAMP_LTZ(3))
                                        .watermark("f2", "f2 - INTERVAL '2' SECONDS")
                                        .build())
                        .as("id", "state", "ts");
        tableEnv.createTemporaryView("source_table", table);
        final Table dedupeTable =
                tableEnv.sqlQuery(
                        "SELECT * FROM ("
                                + " SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY ts DESC) AS row_num FROM source_table"
                                + ") WHERE row_num = 1");
        tableEnv.createTemporaryView("versioned_table", dedupeTable);

        DataStreamSource<Tuple2<Integer, Instant>> event =
                env.fromElements(
                        new Tuple2<>(0, Instant.ofEpochMilli(0)),
                        new Tuple2<>(0, Instant.ofEpochMilli(5)),
                        new Tuple2<>(0, Instant.ofEpochMilli(10)),
                        new Tuple2<>(0, Instant.ofEpochMilli(15)),
                        new Tuple2<>(0, Instant.ofEpochMilli(20)),
                        new Tuple2<>(0, Instant.ofEpochMilli(25)));

        final Table eventTable =
                tableEnv.fromDataStream(
                                event,
                                Schema.newBuilder()
                                        .column("f0", DataTypes.INT())
                                        .column("f1", DataTypes.TIMESTAMP_LTZ(3))
                                        .watermark("f1", "f1 - INTERVAL '2' SECONDS")
                                        .build())
                        .as("id", "ts");

        tableEnv.createTemporaryView("event_table", eventTable);

        final Table result =
                tableEnv.sqlQuery(
                        "SELECT * FROM event_table"
                                + " LEFT JOIN versioned_table FOR SYSTEM_TIME AS OF event_table.ts"
                                + " ON event_table.id = versioned_table.id");
        result.execute().print();

        result.filter($("state").isEqual("online")).execute().print();
    }
} {code}
 

The result of temporal join is the following:

+----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+
| op |          id |                      ts |         id0 |                          state |                     ts0 |              row_num |
+----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+
| +I |           0 | 1970-01-01 08:00:00.000 |           0 |                         online | 1970-01-01 08:00:00.000 |                    1 |
| +I |           0 | 1970-01-01 08:00:00.005 |           0 |                         online | 1970-01-01 08:00:00.000 |                    1 |
| +I |           0 | 1970-01-01 08:00:00.010 |           0 |                        offline | 1970-01-01 08:00:00.010 |                    1 |
| +I |           0 | 1970-01-01 08:00:00.015 |           0 |                        offline | 1970-01-01 08:00:00.010 |                    1 |
| +I |           0 | 1970-01-01 08:00:00.020 |           0 |                         online | 1970-01-01 08:00:00.020 |                    1 |
| +I |           0 | 1970-01-01 08:00:00.025 |           0 |                         online | 1970-01-01 08:00:00.020 |                    1 |
+----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+

 

After filtering with predicate state = 'online', I expect only the two rows with state offline will be filtered out. But I got the following result:

+----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+
| op |          id |                      ts |         id0 |                          state |                     ts0 |              row_num |
+----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+
| +I |           0 | 1970-01-01 08:00:00.020 |           0 |                         online | 1970-01-01 08:00:00.020 |                    1 |
| +I |           0 | 1970-01-01 08:00:00.025 |           0 |                         online | 1970-01-01 08:00:00.020 |                    1 |
+----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+



--
This message was sent by Atlassian Jira
(v8.20.10#820010)