You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2021/01/19 03:27:00 UTC

[jira] [Comment Edited] (FLINK-21013) Blink planner does not ingest timestamp into StreamRecord

    [ https://issues.apache.org/jira/browse/FLINK-21013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17267617#comment-17267617 ] 

Jark Wu edited comment on FLINK-21013 at 1/19/21, 3:26 AM:
-----------------------------------------------------------

I think this is a missing feature since Blink merge. 
See the difference between blink-planner[1] and old-planner [2].

[1] https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLegacySink.scala#L133
[2] https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/DataStreamConversions.scala#L108


was (Author: jark):
I think this is an missing feature since Blink merge. 
See the difference between blink-planner[1] and old-planner [2].

[1] https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLegacySink.scala#L133
[2] https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/DataStreamConversions.scala#L108

> Blink planner does not ingest timestamp into StreamRecord
> ---------------------------------------------------------
>
>                 Key: FLINK-21013
>                 URL: https://issues.apache.org/jira/browse/FLINK-21013
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner, Table SQL / Runtime
>    Affects Versions: 1.12.0
>            Reporter: Timo Walther
>            Priority: Blocker
>             Fix For: 1.12.2
>
>
> Currently, the rowtime attribute is not put into the StreamRecord when leaving the Table API to DataStream API. The legacy planner supports this, but the timestamp is null when using the Blink planner.
> {code}
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setParallelism(1);
>             EnvironmentSettings settings =
>                     EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
>             StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
>         DataStream<Order> orderA =
>                 env.fromCollection(
>                         Arrays.asList(
>                                 new Order(1L, "beer", 3),
>                                 new Order(1L, "diaper", 4),
>                                 new Order(3L, "rubber", 2)));
>         DataStream<Order> orderB =
>                 orderA.assignTimestampsAndWatermarks(
>                         new AssignerWithPunctuatedWatermarks<Order>() {
>                             @Nullable
>                             @Override
>                             public Watermark checkAndGetNextWatermark(
>                                     Order lastElement, long extractedTimestamp) {
>                                 return new Watermark(extractedTimestamp);
>                             }
>                             @Override
>                             public long extractTimestamp(Order element, long recordTimestamp) {
>                                 return element.user;
>                             }
>                         });
>         Table tableA = tEnv.fromDataStream(orderB, $("user").rowtime(), $("product"), $("amount"));
>         // union the two tables
>         Table result = tEnv.sqlQuery("SELECT * FROM " + tableA);
>         tEnv.toAppendStream(result, Row.class)
>                 .process(
>                         new ProcessFunction<Row, Row>() {
>                             @Override
>                             public void processElement(Row value, Context ctx, Collector<Row> out)
>                                     throws Exception {
>                                 System.out.println("TIMESTAMP" + ctx.timestamp());
>                             }
>                         })
>                 .print();
>         env.execute();
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)