You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Asahi Lee <97...@qq.com> on 2021/03/06 08:50:40 UTC
flink 1.12.2版本,DataStream转为Table时设置rowtime列,日期转换错误
你好!
我的程序如下:
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv);
DataStream<Row> rowDataStreamSource = bsEnv.addSource(new SourceFunction<Row>() {
@Override
public void run(SourceContext<Row> sourceContext) throws Exception {
while (true) {
Thread.sleep(1000);
Row row = new Row(RowKind.INSERT, 2);
row.setField(0, "a");
row.setField(1, new Timestamp(System.currentTimeMillis()));
sourceContext.collect(row);
}
}
@Override
public void cancel() {
}
})
.returns(new RowTypeInfo(new TypeInformation[]{Types.STRING, Types.SQL_TIMESTAMP}))
.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
Table table = bsTableEnv.fromDataStream(rowDataStreamSource, $("a"), $("b").rowtime());
table.execute().print();其中我使用$("b").rowtime()将时间戳列转换为rowtime时间后,table执行打印,展示的日期错误,如下:| +I | a | +1705471-09-26T16:47... | | +I | a | +1705471-09-26T16:47... | | +I | a | +1705471-09-26T16:47... |请问,这个这个我该如何处理?另外,可以转换为rowtime列的类型只有long和timestamp,我使用LocalDateTime对象时则无法转换,是否可以支持LocalDateTime类型?