You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by aiden <18...@163.com> on 2023/02/27 10:03:38 UTC

Flink Kafka Sink时间戳异常

hi,我在使用1.16.0版本时遇到kafka sink 时间戳异常大的情况,以下分别为正常和异常数据
正常:
{
      "partition": 0,
      "offset": 16,
      "msg": "xxxxx",
      "timespan": 1677487065330,
      "date": "2023-02-27 16:37:45"
    }
异常:
    {
      "partition": 0,
      "offset": 17,
      "msg": "xxxxxx",
      "timespan": 9223372036854776000,
      "date": "292278994-08-17 15:12:55"
    }
最终发现是由于使用了countWindow算子导致的,推测是由于这个算子窗口为GlobalWindow导致的,有什么方式可以避免这个异常吗?或者可以在序列化kafka sink时手动指定时间戳吗?

Re:Flink Kafka Sink时间戳异常

Posted by haishui <ha...@126.com>.
hi,


这个问题是因为经过窗口算子后StreamRecord中指定的时间时间戳被改成了window.maxTimestamp(),可以查看[1]中WindowOperator或EvictingWindowOperator中的emitWindowContents方法。


如果想要更改时间戳,可以实现一个ProcessFuncton
TimestampedCollector<T> collector = (TimestampedCollector<T>) out;
collector.setAbsoluteTimestamp(   <value.getTimestampField()>   );
collector.collect(value);


如果可以接受kafka内数据使用插入时间,则可以设置topic的log.message.timestamp.type=LogAppendTime







[1] https://github.com/apache/flink/tree/edac2adb9523adcb69e1dacc5fd4ea8f63480175/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing








在 2023-02-27 18:03:38,"aiden" <18...@163.com> 写道:
>
>hi,我在使用1.16.0版本时遇到kafka sink 时间戳异常大的情况,以下分别为正常和异常数据
>正常:
>{
>      "partition": 0,
>      "offset": 16,
>      "msg": "xxxxx",
>      "timespan": 1677487065330,
>      "date": "2023-02-27 16:37:45"
>    }
>异常:
>    {
>      "partition": 0,
>      "offset": 17,
>      "msg": "xxxxxx",
>      "timespan": 9223372036854776000,
>      "date": "292278994-08-17 15:12:55"
>    }
>最终发现是由于使用了countWindow算子导致的,推测是由于这个算子窗口为GlobalWindow导致的,有什么方式可以避免这个异常吗?或者可以在序列化kafka sink时手动指定时间戳吗?