You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by hb <34...@163.com> on 2020/06/05 07:06:48 UTC

flink 1.10SQL 报错问题求教

Flink SQL 作业, 开始运行正常, checkpoint也正常, 没有大状态(状态19.7KB),
但是执行一段时间后,开始报错, 然后恢复执行个几秒,也马上报这个错,然后报错恢复不停循环.
哪位帮忙看看,不胜感激.


2020-06-05 14:13:19,791 INFO org.apache.flink.runtime.taskmanager.Task - Source: KafkaTableSource(userId, utp, utrp, extendFields, requestTime) -> SourceConversion(table=[default_catalog.default_database.user_visit_trace, source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]], fields=[userId, utp, utrp, extendFields, requestTime]) -> Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime], where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') > _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS NOT NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)], joinType=[InnerJoin], async=[false], lookup=[user_id=userId], where=[shop_id IS NOT NULL], select=[item_id, userId, requestTime, shop_id, user_id]) -> Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') CONCAT _UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT _UTF-16LE':' CONCAT item_id) AS redisKey, requestTime AS fieldName]) -> SinkConversionToTuple2 -> Sink: Unnamed (1/8) (ad7a2d51beccbb39f83ac2c16b923bd0) switched from RUNNING to FAILED.
java.lang.Exception: Could not perform checkpoint 401 for operator Source: KafkaTableSource(userId, utp, utrp, extendFields, requestTime) -> SourceConversion(table=[default_catalog.default_database.user_visit_trace, source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]], fields=[userId, utp, utrp, extendFields, requestTime]) -> Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime], where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') > _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS NOT NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)], joinType=[InnerJoin], async=[false], lookup=[user_id=userId], where=[shop_id IS NOT NULL], select=[item_id, userId, requestTime, shop_id, user_id]) -> Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') CONCAT _UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT _UTF-16LE':' CONCAT item_id) AS redisKey, requestTime AS fieldName]) -> SinkConversionToTuple2 -> Sink: Unnamed (1/8).
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:802)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$595/1311782208.call(Unknown Source)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
    at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1411)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$597/1708409807.run(Unknown Source)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
    ... 12 more
2020-06-05 14:13:19,795 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: KafkaTableSource(userId, utp, utrp, extendFields, requestTime) -> SourceConversion(table=[default_catalog.default_database.user_visit_trace, source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]], fields=[userId, utp, utrp, extendFields, requestTime]) -> Calc(select=[(_UTF-16LE'search_rt:' CONCAT ((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') CONCAT _UTF-16LE':traffic:set:' CONCAT getItemId(extendFields)) AS redisKey, requestTime AS fieldName], where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') > _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL)]) -> SinkConversionToTuple2 -> Sink: Unnamed (3/8) (e0452995af60f6ed941b8dedd078def3).


Re: flink 1.10SQL 报错问题求教

Posted by godfrey he <go...@gmail.com>.
hi 请问你用的flink是哪个版本?StreamTask这个类里报了NPE,感觉是bug。

hb <34...@163.com> 于2020年6月5日周五 下午3:07写道:

> Flink SQL 作业, 开始运行正常, checkpoint也正常, 没有大状态(状态19.7KB),
> 但是执行一段时间后,开始报错, 然后恢复执行个几秒,也马上报这个错,然后报错恢复不停循环.
> 哪位帮忙看看,不胜感激.
>
>
> 2020-06-05 14:13:19,791 INFO org.apache.flink.runtime.taskmanager.Task -
> Source: KafkaTableSource(userId, utp, utrp, extendFields, requestTime) ->
> SourceConversion(table=[default_catalog.default_database.user_visit_trace,
> source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]],
> fields=[userId, utp, utrp, extendFields, requestTime]) ->
> Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime],
> where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")
> AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') >
> _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS
> NOT NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)],
> joinType=[InnerJoin], async=[false], lookup=[user_id=userId],
> where=[shop_id IS NOT NULL], select=[item_id, userId, requestTime, shop_id,
> user_id]) -> Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) /
> 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') CONCAT
> _UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT
> _UTF-16LE':' CONCAT item_id) AS redisKey, requestTime AS fieldName]) ->
> SinkConversionToTuple2 -> Sink: Unnamed (1/8)
> (ad7a2d51beccbb39f83ac2c16b923bd0) switched from RUNNING to FAILED.
> java.lang.Exception: Could not perform checkpoint 401 for operator Source:
> KafkaTableSource(userId, utp, utrp, extendFields, requestTime) ->
> SourceConversion(table=[default_catalog.default_database.user_visit_trace,
> source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]],
> fields=[userId, utp, utrp, extendFields, requestTime]) ->
> Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime],
> where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")
> AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') >
> _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS
> NOT NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)],
> joinType=[InnerJoin], async=[false], lookup=[user_id=userId],
> where=[shop_id IS NOT NULL], select=[item_id, userId, requestTime, shop_id,
> user_id]) -> Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) /
> 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') CONCAT
> _UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT
> _UTF-16LE':' CONCAT item_id) AS redisKey, requestTime AS fieldName]) ->
> SinkConversionToTuple2 -> Sink: Unnamed (1/8).
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:802)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$595/1311782208.call(Unknown
> Source)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
>     at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
>     at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
>     at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1411)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$597/1708409807.run(Unknown
> Source)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
>     ... 12 more
> 2020-06-05 14:13:19,795 INFO org.apache.flink.runtime.taskmanager.Task -
> Attempting to fail task externally Source: KafkaTableSource(userId, utp,
> utrp, extendFields, requestTime) ->
> SourceConversion(table=[default_catalog.default_database.user_visit_trace,
> source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]],
> fields=[userId, utp, utrp, extendFields, requestTime]) ->
> Calc(select=[(_UTF-16LE'search_rt:' CONCAT ((CAST(requestTime) / 1000)
> FROM_UNIXTIME _UTF-16LE'yyyyMMdd') CONCAT _UTF-16LE':traffic:set:' CONCAT
> getItemId(extendFields)) AS redisKey, requestTime AS fieldName],
> where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")
> AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') >
> _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL)]) ->
> SinkConversionToTuple2 -> Sink: Unnamed (3/8)
> (e0452995af60f6ed941b8dedd078def3).
>
>

Re: flink 1.10SQL 报错问题求教

Posted by godfrey he <go...@gmail.com>.
hi chenkaibit

欢迎将fix贡献回社区


chenkaibit <ch...@163.com> 于2020年6月9日周二 上午10:34写道:

> 我也遇到过这个问题,这个可能是 checkpoint 的 bug,我修改了下 flink 源码后不报错了,可以参考下这个patch
> https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19
>
>
> 在 2020-06-05 15:06:48,"hb" <34...@163.com> 写道:
> >Flink SQL 作业, 开始运行正常, checkpoint也正常, 没有大状态(状态19.7KB),
> >但是执行一段时间后,开始报错, 然后恢复执行个几秒,也马上报这个错,然后报错恢复不停循环.
> >哪位帮忙看看,不胜感激.
> >
> >
> >2020-06-05 14:13:19,791 INFO org.apache.flink.runtime.taskmanager.Task -
> Source: KafkaTableSource(userId, utp, utrp, extendFields, requestTime) ->
> SourceConversion(table=[default_catalog.default_database.user_visit_trace,
> source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]],
> fields=[userId, utp, utrp, extendFields, requestTime]) ->
> Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime],
> where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")
> AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') >
> _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS
> NOT NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)],
> joinType=[InnerJoin], async=[false], lookup=[user_id=userId],
> where=[shop_id IS NOT NULL], select=[item_id, userId, requestTime, shop_id,
> user_id]) -> Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) /
> 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') CONCAT
> _UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT
> _UTF-16LE':' CONCAT item_id) AS redisKey, requestTime AS fieldName]) ->
> SinkConversionToTuple2 -> Sink: Unnamed (1/8)
> (ad7a2d51beccbb39f83ac2c16b923bd0) switched from RUNNING to FAILED.
> >java.lang.Exception: Could not perform checkpoint 401 for operator
> Source: KafkaTableSource(userId, utp, utrp, extendFields, requestTime) ->
> SourceConversion(table=[default_catalog.default_database.user_visit_trace,
> source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]],
> fields=[userId, utp, utrp, extendFields, requestTime]) ->
> Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime],
> where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")
> AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') >
> _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS
> NOT NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)],
> joinType=[InnerJoin], async=[false], lookup=[user_id=userId],
> where=[shop_id IS NOT NULL], select=[item_id, userId, requestTime, shop_id,
> user_id]) -> Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) /
> 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') CONCAT
> _UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT
> _UTF-16LE':' CONCAT item_id) AS redisKey, requestTime AS fieldName]) ->
> SinkConversionToTuple2 -> Sink: Unnamed (1/8).
> >    at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:802)
> >    at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
> >    at
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$595/1311782208.call(Unknown
> Source)
> >    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >    at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
> >    at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> >    at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
> >    at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
> >    at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> >    at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> >    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> >    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> >    at java.lang.Thread.run(Thread.java:745)
> >Caused by: java.lang.NullPointerException
> >    at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1411)
> >    at
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
> >    at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
> >    at
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$597/1708409807.run(Unknown
> Source)
> >    at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> >    at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
> >    at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
> >    ... 12 more
> >2020-06-05 14:13:19,795 INFO org.apache.flink.runtime.taskmanager.Task -
> Attempting to fail task externally Source: KafkaTableSource(userId, utp,
> utrp, extendFields, requestTime) ->
> SourceConversion(table=[default_catalog.default_database.user_visit_trace,
> source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]],
> fields=[userId, utp, utrp, extendFields, requestTime]) ->
> Calc(select=[(_UTF-16LE'search_rt:' CONCAT ((CAST(requestTime) / 1000)
> FROM_UNIXTIME _UTF-16LE'yyyyMMdd') CONCAT _UTF-16LE':traffic:set:' CONCAT
> getItemId(extendFields)) AS redisKey, requestTime AS fieldName],
> where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")
> AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') >
> _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL)]) ->
> SinkConversionToTuple2 -> Sink: Unnamed (3/8)
> (e0452995af60f6ed941b8dedd078def3).
> >
>

Re:flink 1.10SQL 报错问题求教

Posted by chenkaibit <ch...@163.com>.
我也遇到过这个问题,这个可能是 checkpoint 的 bug,我修改了下 flink 源码后不报错了,可以参考下这个patch  https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19


在 2020-06-05 15:06:48,"hb" <34...@163.com> 写道:
>Flink SQL 作业, 开始运行正常, checkpoint也正常, 没有大状态(状态19.7KB),
>但是执行一段时间后,开始报错, 然后恢复执行个几秒,也马上报这个错,然后报错恢复不停循环.
>哪位帮忙看看,不胜感激.
>
>
>2020-06-05 14:13:19,791 INFO org.apache.flink.runtime.taskmanager.Task - Source: KafkaTableSource(userId, utp, utrp, extendFields, requestTime) -> SourceConversion(table=[default_catalog.default_database.user_visit_trace, source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]], fields=[userId, utp, utrp, extendFields, requestTime]) -> Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime], where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') > _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS NOT NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)], joinType=[InnerJoin], async=[false], lookup=[user_id=userId], where=[shop_id IS NOT NULL], select=[item_id, userId, requestTime, shop_id, user_id]) -> Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') CONCAT _UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT _UTF-16LE':' CONCAT item_id) AS redisKey, requestTime AS fieldName]) -> SinkConversionToTuple2 -> Sink: Unnamed (1/8) (ad7a2d51beccbb39f83ac2c16b923bd0) switched from RUNNING to FAILED.
>java.lang.Exception: Could not perform checkpoint 401 for operator Source: KafkaTableSource(userId, utp, utrp, extendFields, requestTime) -> SourceConversion(table=[default_catalog.default_database.user_visit_trace, source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]], fields=[userId, utp, utrp, extendFields, requestTime]) -> Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime], where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') > _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS NOT NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)], joinType=[InnerJoin], async=[false], lookup=[user_id=userId], where=[shop_id IS NOT NULL], select=[item_id, userId, requestTime, shop_id, user_id]) -> Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') CONCAT _UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT _UTF-16LE':' CONCAT item_id) AS redisKey, requestTime AS fieldName]) -> SinkConversionToTuple2 -> Sink: Unnamed (1/8).
>    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:802)
>    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
>    at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$595/1311782208.call(Unknown Source)
>    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
>    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
>    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
>    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>    at java.lang.Thread.run(Thread.java:745)
>Caused by: java.lang.NullPointerException
>    at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1411)
>    at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
>    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
>    at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$597/1708409807.run(Unknown Source)
>    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>    at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
>    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
>    ... 12 more
>2020-06-05 14:13:19,795 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: KafkaTableSource(userId, utp, utrp, extendFields, requestTime) -> SourceConversion(table=[default_catalog.default_database.user_visit_trace, source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]], fields=[userId, utp, utrp, extendFields, requestTime]) -> Calc(select=[(_UTF-16LE'search_rt:' CONCAT ((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') CONCAT _UTF-16LE':traffic:set:' CONCAT getItemId(extendFields)) AS redisKey, requestTime AS fieldName], where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') > _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL)]) -> SinkConversionToTuple2 -> Sink: Unnamed (3/8) (e0452995af60f6ed941b8dedd078def3).
>