You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Asahi Lee <97...@qq.com> on 2021/03/06 08:50:40 UTC

flink 1.12.2版本,DataStream转为Table时设置rowtime列,日期转换错误

你好!
&nbsp; &nbsp; &nbsp; 我的程序如下:
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv);

DataStream<Row&gt; rowDataStreamSource = bsEnv.addSource(new SourceFunction<Row&gt;() {
    @Override
    public void run(SourceContext<Row&gt; sourceContext) throws Exception {
        while (true) {
            Thread.sleep(1000);
            Row row = new Row(RowKind.INSERT, 2);
            row.setField(0, "a");
            row.setField(1, new Timestamp(System.currentTimeMillis()));
            sourceContext.collect(row);
        }
    }

    @Override
    public void cancel() {

    }
})
        .returns(new RowTypeInfo(new TypeInformation[]{Types.STRING, Types.SQL_TIMESTAMP}))
        .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());

Table table = bsTableEnv.fromDataStream(rowDataStreamSource, $("a"), $("b").rowtime());
table.execute().print();其中我使用$("b").rowtime()将时间戳列转换为rowtime时间后,table执行打印,展示的日期错误,如下:| +I |                              a | +1705471-09-26T16:47... | | +I |                              a | +1705471-09-26T16:47... | | +I |                              a | +1705471-09-26T16:47... |请问,这个这个我该如何处理?另外,可以转换为rowtime列的类型只有long和timestamp,我使用LocalDateTime对象时则无法转换,是否可以支持LocalDateTime类型?