You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Henry Dai <dh...@gmail.com> on 2020/11/04 13:35:33 UTC

A question about flink sql retreact stream

Dear flink developers&users

    I have a question about flink sql, It gives me a lot of trouble, Thank
you very much for some help.

    Lets's assume we have two data stream, `order` and `order_detail`, they
are from mysql binlog.

    Table `order` schema:
    id              int     primary key
    order_id    int
    status        int

    Table `order_detail` schema:
    id               int     primary key
    order_id     int
    quantity      int

    order : order_detail = 1:N, they are joined by `order_id`

    think we have following data sequence, and we compute sum(quantity)
group by order.oreder_id after they are joined

time         order                            order__detail
    result
        id  order_id    status            id  order_id    quantity
1       1   12345       0
2                                                 1   12345       10
             (T 10)
3                                                 2   12345       11
             (F 10)(T 21)
4                                                 3   12345       12
             (F 21)(T 33)
5       1   12345       1
              (F 33)(T 21)(F 21)(T 10)(F 10)(T 12)(F 12)(T 23)(F 23)(T 33)


    Code:
    tableEnv.registerTableSource("a", new Order());
    tableEnv.registerTableSource("b", new OrderDetail());
    Table tbl1 = tableEnv.sqlQuery("SELECT id, LAST_VALUE(order_id) AS
order_id, LAST_VALUE(status) AS status FROM a GROUP BY id");
    tableEnv.registerTable("ax", tbl1);
    Table tbl2 = tableEnv.sqlQuery("SELECT id, LAST_VALUE(order_id) AS
order_id, LAST_VALUE(quantity) AS quantity FROM b GROUP BY id");
    tableEnv.registerTable("bx", tbl2);
    Table table = tableEnv.sqlQuery("SELECT ax.order_id, SUM(bx.quantity)
FROM ax  JOIN bx ON ax.order_id = bx.order_id GROUP BY ax.order_id");
    DataStream<Tuple2<Boolean, Row>> stream =
tableEnv.toRetractStream(table, Row.class);
    stream.print();

    Result:
    (true,12345,10)
    (false,12345,10)
    (true,12345,21)
    (false,12345,21)
    (true,12345,33)
    (false,12345,33)
    (true,12345,21)
    (false,12345,21)
    (true,12345,10)
    (false,12345,10)
    (true,12345,12)
    (false,12345,12)
    (true,12345,23)
    (false,12345,23)
    (true,12345,33)


    I cann't understand why flink emit so many records at time 5?

    In production, we consume binlog stream from kafka, convert stream to
flink table, after sql computation, convert result table to flink stream
where we only
    preserve TRUE message in retract stream, and emit them to downstream
kafka.

    Do we have some method to realize flink dynamic table really (I mean,
trigger computation only once), when we receive (1,12345,1) from `order`,
only emit (F 33)(T 33).

-- 
best wishes
hengyu