You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Matthias Broecheler <ma...@dataeng.ai> on 2022/10/10 21:12:53 UTC

Preserve rowtime through join

Hey Flinksters,

I was wondering if you had any ideas for how to preserve the rowtime across
an INNER equi join so that the output can be used in a temporal join.

I've attached an example based on the TemporalJoinTest where I'm creating
two views by deduplicating underlying streams (to rates_pk and agency_pk),
then join those views on agencyid (the pk). I dedup the output to make the
primary key of rates_pk the pk of the output, but when I run
testDoubleInlineJoinState, I get the exception:

java.lang.AssertionError: Sql optimization: Assertion error: type mismatch:
ref:
TIMESTAMP(3) *ROWTIME*
input:
TIMESTAMP(3)

which indicates that the rowtime is not preserved. Intuitively, it seems
that the bigger of the two input rowtimes should be the resulting rowtime,
but I tried using GREATEST and that didn't work either.

Note, that my example works if I do two temporal joins (testDoubleJoin) or
join the underlying streams before deduplicating
(testDoubleInlineJoinStream). So, I feel like I'm not too far off. It's
just that the way I'm generating the Flink plan happens to be creating the
joined state table first and I'd love to find a way to tell Flink how to
preserve the rowtime.

Is there a way to do that?
Thanks a lot,
Matthias

--------------- CODE -------------

package org.apache.flink.table.planner.plan.stream.sql.join

import org.apache.flink.table.api.{ExplainDetail, ValidationException}
import org.apache.flink.table.planner.utils.{StreamTableTestUtil, TableTestBase}
import org.junit.Assert.{assertTrue, fail}
import org.junit.{Before, Test}

/**
 * Test temporal join in stream mode.
 */
class TemporalJoinTestExperimentShare extends TableTestBase {

  val util: StreamTableTestUtil = streamTestUtil()

  @Before
  def before(): Unit = {
    util.addTable(
      """
        |CREATE TABLE Orders (
        | amount INT,
        | currency STRING,
        | rowtime TIMESTAMP(3),
        | proctime AS PROCTIME(),
        | WATERMARK FOR rowtime AS rowtime
        |) WITH (
        | 'connector' = 'values'
        |)
      """.stripMargin)


    util.addTable(
      """
        |CREATE TABLE RatesHistory (
        | currency STRING,
        | rate INT,
        | agencyid INT,
        | rowtime TIMESTAMP(3),
        | WATERMARK FOR rowtime AS rowtime
        |) WITH (
        | 'connector' = 'values'
        |)
      """.stripMargin)

    util.addTable(
      """
        |CREATE TABLE AgencyHistory (
        | agencyid INT,
        | name STRING,
        | rowtime TIMESTAMP(3),
        | WATERMARK FOR rowtime AS rowtime
        |) WITH (
        | 'connector' = 'values'
        |)
      """.stripMargin)

    util.addTable(
      " CREATE VIEW rates_pk AS SELECT currency, rate, agencyid,
rowtime FROM " +
        "  (SELECT *, " +
        "          ROW_NUMBER() OVER (PARTITION BY currency ORDER BY
rowtime DESC) AS rowNum " +
        "   FROM RatesHistory" +
        "  ) T " +
        "  WHERE rowNum = 1")

    util.addTable(
      " CREATE VIEW agency_pk AS SELECT agencyid, name, rowtime FROM " +
        "  (SELECT *, " +
        "          ROW_NUMBER() OVER (PARTITION BY agencyid ORDER BY
rowtime DESC) AS rowNum " +
        "   FROM AgencyHistory" +
        "  ) T " +
        "  WHERE rowNum = 1")

    util.addTable(
      " CREATE VIEW rates_and_agency_pk_stream AS SELECT currency,
rate, rowtime, agencyname FROM " +
        "  (SELECT *, " +
        "          ROW_NUMBER() OVER (PARTITION BY currency ORDER BY
rowtime DESC) AS rowNum " +
        "   FROM (SELECT r.currency, r.rate,  r.rowtime, a.name AS
agencyname FROM RatesHistory AS r JOIN AgencyHistory AS a ON
r.agencyid = a.agencyid AND r.rowtime >= a.rowtime AND r.rowtime <=
a.rowtime + INTERVAL '999' YEAR(3)) X " +
        "  ) T " +
        "  WHERE rowNum = 1")

    util.addTable(
      " CREATE VIEW rates_and_agency_pk_state AS SELECT currency,
rate, rowtime, agencyname FROM " +
        "  (SELECT *, " +
        "          ROW_NUMBER() OVER (PARTITION BY currency ORDER BY
rowtime DESC) AS rowNum " +
        "   FROM (SELECT r.currency, r.rate, r.rowtime as rowtime,
a.name AS agencyname FROM rates_pk AS r JOIN agency_pk AS a ON
r.agencyid = a.agencyid) X " +
        "  ) T " +
        "  WHERE rowNum = 1")

  }

  @Test
  def testDoubleInlineJoinStream(): Unit = {
    val sqlQuery = "SELECT * " +
      "FROM Orders AS o JOIN " +
      "rates_and_agency_pk_stream " +
      "FOR SYSTEM_TIME AS OF o.rowtime AS r " +
      "ON o.currency = r.currency "

    println(util.tableEnv.sqlQuery(sqlQuery).explain())
  }

  @Test
  def testDoubleInlineJoinState(): Unit = {
    val sqlQuery = "SELECT * " +
      "FROM Orders AS o JOIN " +
      "rates_and_agency_pk_state " +
      "FOR SYSTEM_TIME AS OF o.rowtime AS r " +
      "ON o.currency = r.currency "

    println(util.tableEnv.sqlQuery(sqlQuery).explain())
  }

  @Test
  def testDoubleJoin(): Unit = {
    val sqlQuery = "SELECT * " +
      "FROM Orders AS o JOIN " +
      "rates_pk " +
      "FOR SYSTEM_TIME AS OF o.rowtime AS r " +
      "ON o.currency = r.currency " +
      "JOIN agency_pk " +
      "FOR SYSTEM_TIME AS OF o.rowtime AS a " +
      "ON r.agencyid = a.agencyid"

    println(util.tableEnv.sqlQuery(sqlQuery).explain())
  }
}