You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Timo Walther (Jira)" <ji...@apache.org> on 2021/01/18 16:24:00 UTC
[jira] [Created] (FLINK-21013) Blink planner does not ingest
timestamp into StreamRecord
Timo Walther created FLINK-21013:
------------------------------------
Summary: Blink planner does not ingest timestamp into StreamRecord
Key: FLINK-21013
URL: https://issues.apache.org/jira/browse/FLINK-21013
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner, Table SQL / Runtime
Reporter: Timo Walther
Currently, the rowtime attribute is not put into the StreamRecord when leaving the Table API to DataStream API. The legacy planner supports this, but the timestamp is null when using the Blink planner.
{code}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
DataStream<Order> orderA =
env.fromCollection(
Arrays.asList(
new Order(1L, "beer", 3),
new Order(1L, "diaper", 4),
new Order(3L, "rubber", 2)));
DataStream<Order> orderB =
orderA.assignTimestampsAndWatermarks(
new AssignerWithPunctuatedWatermarks<Order>() {
@Nullable
@Override
public Watermark checkAndGetNextWatermark(
Order lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp);
}
@Override
public long extractTimestamp(Order element, long recordTimestamp) {
return element.user;
}
});
Table tableA = tEnv.fromDataStream(orderB, $("user").rowtime(), $("product"), $("amount"));
// union the two tables
Table result = tEnv.sqlQuery("SELECT * FROM " + tableA);
tEnv.toAppendStream(result, Row.class)
.process(
new ProcessFunction<Row, Row>() {
@Override
public void processElement(Row value, Context ctx, Collector<Row> out)
throws Exception {
System.out.println("TIMESTAMP" + ctx.timestamp());
}
})
.print();
env.execute();
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)