You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Caizhi Weng (Jira)" <ji...@apache.org> on 2021/12/17 03:12:00 UTC

[jira] [Created] (FLINK-25357) SQL planner incorrectly changes a streaming join with FLOOR(rowtime) into interval join

Caizhi Weng created FLINK-25357:
-----------------------------------

             Summary: SQL planner incorrectly changes a streaming join with FLOOR(rowtime) into interval join
                 Key: FLINK-25357
                 URL: https://issues.apache.org/jira/browse/FLINK-25357
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.14.2
            Reporter: Caizhi Weng


This issue is reported from the [user mailing list|https://lists.apache.org/thread/v8omhomp58hb8m5dj4noxbr1dsyy6zjl].

Add the following test case to {{TableEnvironmentITCase}} to reproduce this issue.
{code:scala}
@Test
def myTest(): Unit = {
  val data = Seq(
    Row.of(
      "1",
      java.time.LocalDateTime.of(2021, 12, 13, 12, 5, 8)
    ),
    Row.of(
      "1",
      java.time.LocalDateTime.of(2021, 12, 13, 13, 5, 4)
    ),
    Row.of(
      "1",
      java.time.LocalDateTime.of(2021, 12, 13, 14, 5, 6)
    )
  )

  tEnv.executeSql(
    s"""
       |create table T (
       |  id STRING,
       |  b TIMESTAMP(3),
       |  WATERMARK FOR b AS b - INTERVAL '60' MINUTES
       |) WITH (
       |  'connector' = 'values',
       |  'bounded' = 'true',
       |  'data-id' = '${TestValuesTableFactory.registerData(data)}'
       |)
       |""".stripMargin)
  tEnv.executeSql(
    """
      |SELECT
      |  source.id AS sourceid,
      |  CAST(source.b AS TIMESTAMP) AS source_startat,
      |  CAST(target.b AS TIMESTAMP) AS target_startat
      |FROM T source, T target
      |WHERE source.id = target.id
      |AND source.id IN ('1', '2', '3')
      |AND source.b >= FLOOR(target.b TO HOUR) + INTERVAL '1' HOUR AND source.b < FLOOR(target.b TO HOUR) + INTERVAL '2' HOUR
      |""".stripMargin).print()
}
{code}

Results (correct) for the batch task is
{code}
+--------------------------------+----------------------------+----------------------------+
|                       sourceid |             source_startat |             target_startat |
+--------------------------------+----------------------------+----------------------------+
|                              1 | 2021-12-13 13:05:04.000000 | 2021-12-13 12:05:08.000000 |
|                              1 | 2021-12-13 14:05:06.000000 | 2021-12-13 13:05:04.000000 |
+--------------------------------+----------------------------+----------------------------+
{code}

Results (incorrect) for the streaming task is
{code}
+----+--------------------------------+----------------------------+----------------------------+
| op |                       sourceid |             source_startat |             target_startat |
+----+--------------------------------+----------------------------+----------------------------+
| +I |                              1 | 2021-12-13 14:05:06.000000 | 2021-12-13 12:05:08.000000 |
| +I |                              1 | 2021-12-13 14:05:06.000000 | 2021-12-13 13:05:04.000000 |
+----+--------------------------------+----------------------------+----------------------------+
{code}

Plan for the streaming task is
{code}
LogicalProject(sourceid=[$0], source_startat=[CAST($1):TIMESTAMP(6)], target_startat=[CAST($3):TIMESTAMP(6)])
+- LogicalFilter(condition=[AND(=($0, $2), OR(=($0, _UTF-16LE'1'), =($0, _UTF-16LE'2'), =($0, _UTF-16LE'3')), >=($1, +(FLOOR($3, FLAG(HOUR)), 3600000:INTERVAL HOUR)), <($1, +(FLOOR($3, FLAG(HOUR)), 7200000:INTERVAL HOUR)))])
   +- LogicalJoin(condition=[true], joinType=[inner])
      :- LogicalWatermarkAssigner(rowtime=[b], watermark=[-($1, 3600000:INTERVAL MINUTE)])
      :  +- LogicalTableScan(table=[[default_catalog, default_database, T]])
      +- LogicalWatermarkAssigner(rowtime=[b], watermark=[-($1, 3600000:INTERVAL MINUTE)])
         +- LogicalTableScan(table=[[default_catalog, default_database, T]])

== Optimized Physical Plan ==
Calc(select=[id AS sourceid, CAST(CAST(b)) AS source_startat, CAST(CAST(b0)) AS target_startat])
+- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=3600000, leftUpperBound=7199999, leftTimeIndex=1, rightTimeIndex=1], where=[AND(=(id, id0), >=(b, +(FLOOR(b0, FLAG(HOUR)), 3600000:INTERVAL HOUR)), <(b, +(FLOOR(b0, FLAG(HOUR)), 7200000:INTERVAL HOUR)))], select=[id, b, id0, b0])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[id, b], where=[SEARCH(id, Sarg[_UTF-16LE'1', _UTF-16LE'2', _UTF-16LE'3']:CHAR(1) CHARACTER SET "UTF-16LE")])
   :     +- WatermarkAssigner(rowtime=[b], watermark=[-(b, 3600000:INTERVAL MINUTE)])
   :        +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, b])
   +- Exchange(distribution=[hash[id]])
      +- WatermarkAssigner(rowtime=[b], watermark=[-(b, 3600000:INTERVAL MINUTE)])
         +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, b])

== Optimized Execution Plan ==
Calc(select=[id AS sourceid, CAST(CAST(b)) AS source_startat, CAST(CAST(b0)) AS target_startat])
+- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=3600000, leftUpperBound=7199999, leftTimeIndex=1, rightTimeIndex=1], where=[((id = id0) AND (b >= (FLOOR(b0, FLAG(HOUR)) + 3600000:INTERVAL HOUR)) AND (b < (FLOOR(b0, FLAG(HOUR)) + 7200000:INTERVAL HOUR)))], select=[id, b, id0, b0])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[id, b], where=[SEARCH(id, Sarg[_UTF-16LE'1', _UTF-16LE'2', _UTF-16LE'3']:CHAR(1) CHARACTER SET "UTF-16LE")])
   :     +- WatermarkAssigner(rowtime=[b], watermark=[(b - 3600000:INTERVAL MINUTE)])(reuse_id=[1])
   :        +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, b])
   +- Exchange(distribution=[hash[id]])
      +- Reused(reference_id=[1])
{code}

You can see that the planner incorrectly changes this join to an interval join. The generated condition for the interval join is also incorrect, which causes the 1st line of the streaming result to be produced.
{code:java}
public class ConditionFunction$171
        extends org.apache.flink.api.common.functions.AbstractRichFunction
        implements org.apache.flink.table.runtime.generated.JoinCondition {

    public ConditionFunction$171(Object[] references) throws Exception {}

    @Override
    public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {}

    @Override
    public boolean apply(
            org.apache.flink.table.data.RowData in1, org.apache.flink.table.data.RowData in2)
            throws Exception {

        return true;
    }

    @Override
    public void close() throws Exception {
        super.close();
    }
}
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)