You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by hb <34...@163.com> on 2020/06/17 03:00:23 UTC

flink1.10.1 SQL 作业 netty报错, 求帮助

flink1.10.1 写的 SQL 作业, 开始运行3个小时正常,  checkpoint也正常.
然后,checkpoint失败了, 作业一直卡在RESTARTING 状态不动.


TaskManager 日志:
2020-06-1620:38:16,640INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-11, groupId=] Discoveredgroup coordinator 172.16.30.165:9092 (id: 2147483645 rack: null)
2020-06-1623:27:46,026INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[((edit_time SUBSTRING0SUBSTRING10) CONCAT _UTF-16LE':'CONCAT _UTF-16LE'rt:trace'CONCAT _UTF-16LE':'CONCAT _UTF-16LE'item_ids_ordered'CONCAT _UTF-16LE':'CONCAT shop_id) AS setKey, item_id AS setValue], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) -> SinkConversionToTuple2 -> Sink: Unnamed (1/2) (5c29783b8f7ed8bfb1a7723f5c4216b1).
2020-06-1623:27:46,027INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[((edit_time SUBSTRING0SUBSTRING10) CONCAT _UTF-16LE':'CONCAT _UTF-16LE'rt:trace'CONCAT _UTF-16LE':'CONCAT _UTF-16LE'item_ids_ordered'CONCAT _UTF-16LE':'CONCAT shop_id) AS setKey, item_id AS setValue], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) -> SinkConversionToTuple2 -> Sink: Unnamed (2/2) (ed41475641eb3c58f7504d4a16a9c19b).
2020-06-1623:27:46,034INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[(edit_time SUBSTRING0SUBSTRING10) AS the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) (1/2) (2d34cabe390aaadb88c2b861250b793a).
2020-06-1623:27:46,036INFO org.apache.flink.runtime.taskmanager.Task - Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[(edit_time SUBSTRING0SUBSTRING10) AS the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) (1/2) (2d34cabe390aaadb88c2b861250b793a) switched fromRUNNING to FAILED.
java.lang.Exception: Couldnot perform checkpoint 329for operator Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[(edit_time SUBSTRING0SUBSTRING10) AS the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) (1/2).
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:785)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:760)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$429/1681613747.call(UnknownSource)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
    at java.lang.Thread.run(Thread.java:745)
Causedby: org.apache.flink.shaded.netty4.io.netty.util.IllegalReferenceCountException: refCnt: 0
    at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1464)
    at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1448)
    at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.readLong(AbstractByteBuf.java:843)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:964)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$430/890021961.run(UnknownSource)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:843)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:776)
    ... 12 more
2020-06-1623:27:46,045INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[order_id, shop_id, item_id, edit_time], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) -> LookupJoin(table=[JDBCTableSource(id, category_id_first, brand, category)], joinType=[InnerJoin], async=[false], lookup=[id=item_id], where=[brand ISNOTNULL], select=[order_id, shop_id, item_id, edit_time, id, brand]) -> Calc(select=[(edit_time SUBSTRING0SUBSTRING10) AS the_day, shop_id, brand, order_id]) (1/2) (0527ac291f32f14675f9d5bec8ef5369).
2020-06-1623:27:46,038INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[(edit_time SUBSTRING0SUBSTRING10) AS the_day, shop_id, item_id, 1AS $f3], where=[((_change_column jsonHasKey _UTF-16LE'"pay_time"') AND shop_id ISNOTNULLAND item_id ISNOTNULL)]) (2/2) (2d984d0e7df4ed2a6fa1d8892fcccefc).
2020-06-1623:27:46,038INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[(edit_time SUBSTRING0SUBSTRING10) AS the_day, shop_id, item_id, 1AS $f3], where=[((_change_column jsonHasKey _UTF-16LE'"pay_time"') AND shop_id ISNOTNULLAND item_id ISNOTNULL)]) (1/2) (32b2422a4619f4900c5a668dfb466ec9).
2020-06-1623:27:46,037INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[(edit_time SUBSTRING0SUBSTRING10) AS the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) (2/2) (65d0693dbe12af0d571601dfafd0ca09).
2020-06-1623:27:46,050INFO org.apache.flink.runtime.taskmanager.Task - Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[(edit_time SUBSTRING0SUBSTRING10) AS the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) (2/2) (65d0693dbe12af0d571601dfafd0ca09) switched fromRUNNING to FAILED.
java.lang.Exception: Couldnot perform checkpoint 329for operator Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[(edit_time SUBSTRING0SUBSTRING10) AS the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) (2/2).
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:785)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:760)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$429/1681613747.call(UnknownSource)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
    at java.lang.Thread.run(Thread.java:745)
Causedby: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(8) exceeds writerIndex(0): Buffer1 (freed)
    at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1451)
    at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.readLong(AbstractByteBuf.java:843)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:964)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$430/890021961.run(UnknownSource)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:843)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:776)
    ... 12 more
2020-06-1623:27:46,051INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[(edit_time SUBSTRING0SUBSTRING10) AS the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) (2/2) (65d0693dbe12af0d571601dfafd0ca09).
2020-06-1623:27:46,036INFO org.apache.flink.runtime.taskmanager.Task - Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[((edit_time SUBSTRING0SUBSTRING10) CONCAT _UTF-16LE':'CONCAT _UTF-16LE'rt:trace'CONCAT _UTF-16LE':'CONCAT _UTF-16LE'item_ids_ordered'CONCAT _UTF-16LE':'CONCAT shop_id) AS setKey, item_id AS setValue], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) -> SinkConversionToTuple2 -> Sink: Unnamed (1/2) (5c29783b8f7ed8bfb1a7723f5c4216b1) switched fromRUNNING to FAILED.
java.lang.Exception: Couldnot perform checkpoint 329for operator Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[((edit_time SUBSTRING0SUBSTRING10) CONCAT _UTF-16LE':'CONCAT _UTF-16LE'rt:trace'CONCAT _UTF-16LE':'CONCAT _UTF-16LE'item_ids_ordered'CONCAT _UTF-16LE':'CONCAT shop_id) AS setKey, item_id AS setValue], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) -> SinkConversionToTuple2 -> Sink: Unnamed (1/2).
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:785)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:760)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$429/1681613747.call(UnknownSource)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
    at java.lang.Thread.run(Thread.java:745)
Causedby: java.lang.NullPointerException
    at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:964)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$430/890021961.run(UnknownSource)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:843)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:776)
    ... 12 more


求指点啊.

Re: flink1.10.1 SQL 作业 netty报错, 求帮助

Posted by Jark Wu <im...@gmail.com>.
看起来是一个已知问题: https://issues.apache.org/jira/browse/FLINK-17479

On Wed, 17 Jun 2020 at 11:00, hb <34...@163.com> wrote:

> flink1.10.1 写的 SQL 作业, 开始运行3个小时正常,  checkpoint也正常.
> 然后,checkpoint失败了, 作业一直卡在RESTARTING 状态不动.
>
> TaskManager 日志:
> 2020-06-16 20:38:16,640 INFO org.apache.kafka.clients.consumer.internals.
> AbstractCoordinator - [Consumer clientId=consumer-11, groupId=] Discovered
> group coordinator 172.16.30.165:9092 (id: 2147483645 rack: null)
> 2020-06-16 23:27:46,026 INFO org.apache.flink.runtime.taskmanager.Task -
> Attempting to fail task externally Source: KafkaTableSource(id, order_id,
> order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name,
> item_count, origin_single_item_amount, pay_amount, tax_amount,
> item_actual_amount, customer_amount, sharing_amount, logistic_type,
> logistic_pay_type, logistic_amount, attribute, spu_feature, spec,
> spec_desc, item_picture, promotion_attr, order_status, refund_status,
> biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time,
> stock_out_time, delivery_time, end_time, is_deleted, creator, editor,
> create_time, edit_time, _change_column, _old_column, _ddl_field,
> _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[((edit_time SUBSTRING 0 SUBSTRING 10)
> CONCAT _UTF-16LE':' CONCAT _UTF-16LE'rt:trace' CONCAT _UTF-16LE':' CONCAT
> _UTF-16LE'item_ids_ordered' CONCAT _UTF-16LE':' CONCAT shop_id) AS
> setKey, item_id AS setValue], where=[(_change_column jsonHasKey _UTF-16LE
> '"pay_time"')]) -> SinkConversionToTuple2 -> Sink: Unnamed (1/2) (5
> c29783b8f7ed8bfb1a7723f5c4216b1).
> 2020-06-16 23:27:46,027 INFO org.apache.flink.runtime.taskmanager.Task -
> Attempting to fail task externally Source: KafkaTableSource(id, order_id,
> order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name,
> item_count, origin_single_item_amount, pay_amount, tax_amount,
> item_actual_amount, customer_amount, sharing_amount, logistic_type,
> logistic_pay_type, logistic_amount, attribute, spu_feature, spec,
> spec_desc, item_picture, promotion_attr, order_status, refund_status,
> biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time,
> stock_out_time, delivery_time, end_time, is_deleted, creator, editor,
> create_time, edit_time, _change_column, _old_column, _ddl_field,
> _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[((edit_time SUBSTRING 0 SUBSTRING 10)
> CONCAT _UTF-16LE':' CONCAT _UTF-16LE'rt:trace' CONCAT _UTF-16LE':' CONCAT
> _UTF-16LE'item_ids_ordered' CONCAT _UTF-16LE':' CONCAT shop_id) AS
> setKey, item_id AS setValue], where=[(_change_column jsonHasKey _UTF-16LE
> '"pay_time"')]) -> SinkConversionToTuple2 -> Sink: Unnamed (2/2)
> (ed41475641eb3c58f7504d4a16a9c19b).
> 2020-06-16 23:27:46,034 INFO org.apache.flink.runtime.taskmanager.Task -
> Attempting to fail task externally Source: KafkaTableSource(id, order_id,
> order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name,
> item_count, origin_single_item_amount, pay_amount, tax_amount,
> item_actual_amount, customer_amount, sharing_amount, logistic_type,
> logistic_pay_type, logistic_amount, attribute, spu_feature, spec,
> spec_desc, item_picture, promotion_attr, order_status, refund_status,
> biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time,
> stock_out_time, delivery_time, end_time, is_deleted, creator, editor,
> create_time, edit_time, _change_column, _old_column, _ddl_field,
> _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS
> the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE
> '"pay_time"')]) (1/2) (2d34cabe390aaadb88c2b861250b793a).
> 2020-06-16 23:27:46,036 INFO org.apache.flink.runtime.taskmanager.Task -
> Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS
> the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE
> '"pay_time"')]) (1/2) (2d34cabe390aaadb88c2b861250b793a) switched from
> RUNNING to FAILED.
> java.lang.Exception: Could not perform checkpoint 329 for operator Source:
> KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS
> the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE
> '"pay_time"')]) (1/2).
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .triggerCheckpoint(StreamTask.java:785)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$triggerCheckpointAsync$3(StreamTask.java:760)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$429/
> 1681613747.call(Unknown Source)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at org.apache.flink.streaming.runtime.tasks.
> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(
> StreamTaskActionExecutor.java:87)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail
> .java:78)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .processMail(MailboxProcessor.java:261)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:186)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:485)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:469)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.shaded.netty4.io.netty.util.
> IllegalReferenceCountException: refCnt: 0
>     at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf
> .ensureAccessible(AbstractByteBuf.java:1464)
>     at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf
> .checkReadableBytes0(AbstractByteBuf.java:1448)
>     at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf
> .readLong(AbstractByteBuf.java:843)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .checkpointState(StreamTask.java:964)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$performCheckpoint$5(StreamTask.java:870)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$430/
> 890021961.run(Unknown Source)
>     at org.apache.flink.streaming.runtime.tasks.
> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(
> StreamTaskActionExecutor.java:94)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .performCheckpoint(StreamTask.java:843)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .triggerCheckpoint(StreamTask.java:776)
>     ... 12 more
> 2020-06-16 23:27:46,045 INFO org.apache.flink.runtime.taskmanager.Task -
> Attempting to fail task externally Source: KafkaTableSource(id, order_id,
> order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name,
> item_count, origin_single_item_amount, pay_amount, tax_amount,
> item_actual_amount, customer_amount, sharing_amount, logistic_type,
> logistic_pay_type, logistic_amount, attribute, spu_feature, spec,
> spec_desc, item_picture, promotion_attr, order_status, refund_status,
> biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time,
> stock_out_time, delivery_time, end_time, is_deleted, creator, editor,
> create_time, edit_time, _change_column, _old_column, _ddl_field,
> _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[order_id, shop_id, item_id, edit_time],
> where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) -> LookupJoin
> (table=[JDBCTableSource(id, category_id_first, brand, category)],
> joinType=[InnerJoin], async=[false], lookup=[id=item_id], where=[brand IS
> NOT NULL], select=[order_id, shop_id, item_id, edit_time, id, brand]) ->
> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS the_day, shop_id,
> brand, order_id]) (1/2) (0527ac291f32f14675f9d5bec8ef5369).
> 2020-06-16 23:27:46,038 INFO org.apache.flink.runtime.taskmanager.Task -
> Attempting to fail task externally Source: KafkaTableSource(id, order_id,
> order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name,
> item_count, origin_single_item_amount, pay_amount, tax_amount,
> item_actual_amount, customer_amount, sharing_amount, logistic_type,
> logistic_pay_type, logistic_amount, attribute, spu_feature, spec,
> spec_desc, item_picture, promotion_attr, order_status, refund_status,
> biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time,
> stock_out_time, delivery_time, end_time, is_deleted, creator, editor,
> create_time, edit_time, _change_column, _old_column, _ddl_field,
> _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS
> the_day, shop_id, item_id, 1 AS $f3], where=[((_change_column jsonHasKey
> _UTF-16LE'"pay_time"') AND shop_id IS NOT NULL AND item_id IS NOT NULL)])
> (2/2) (2d984d0e7df4ed2a6fa1d8892fcccefc).
> 2020-06-16 23:27:46,038 INFO org.apache.flink.runtime.taskmanager.Task -
> Attempting to fail task externally Source: KafkaTableSource(id, order_id,
> order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name,
> item_count, origin_single_item_amount, pay_amount, tax_amount,
> item_actual_amount, customer_amount, sharing_amount, logistic_type,
> logistic_pay_type, logistic_amount, attribute, spu_feature, spec,
> spec_desc, item_picture, promotion_attr, order_status, refund_status,
> biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time,
> stock_out_time, delivery_time, end_time, is_deleted, creator, editor,
> create_time, edit_time, _change_column, _old_column, _ddl_field,
> _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS
> the_day, shop_id, item_id, 1 AS $f3], where=[((_change_column jsonHasKey
> _UTF-16LE'"pay_time"') AND shop_id IS NOT NULL AND item_id IS NOT NULL)])
> (1/2) (32b2422a4619f4900c5a668dfb466ec9).
> 2020-06-16 23:27:46,037 INFO org.apache.flink.runtime.taskmanager.Task -
> Attempting to fail task externally Source: KafkaTableSource(id, order_id,
> order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name,
> item_count, origin_single_item_amount, pay_amount, tax_amount,
> item_actual_amount, customer_amount, sharing_amount, logistic_type,
> logistic_pay_type, logistic_amount, attribute, spu_feature, spec,
> spec_desc, item_picture, promotion_attr, order_status, refund_status,
> biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time,
> stock_out_time, delivery_time, end_time, is_deleted, creator, editor,
> create_time, edit_time, _change_column, _old_column, _ddl_field,
> _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS
> the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE
> '"pay_time"')]) (2/2) (65d0693dbe12af0d571601dfafd0ca09).
> 2020-06-16 23:27:46,050 INFO org.apache.flink.runtime.taskmanager.Task -
> Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS
> the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE
> '"pay_time"')]) (2/2) (65d0693dbe12af0d571601dfafd0ca09) switched from
> RUNNING to FAILED.
> java.lang.Exception: Could not perform checkpoint 329 for operator Source:
> KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS
> the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE
> '"pay_time"')]) (2/2).
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .triggerCheckpoint(StreamTask.java:785)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$triggerCheckpointAsync$3(StreamTask.java:760)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$429/
> 1681613747.call(Unknown Source)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at org.apache.flink.streaming.runtime.tasks.
> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(
> StreamTaskActionExecutor.java:87)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail
> .java:78)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .processMail(MailboxProcessor.java:261)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:186)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:485)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:469)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(8)
> exceeds writerIndex(0): Buffer 1 (freed)
>     at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf
> .checkReadableBytes0(AbstractByteBuf.java:1451)
>     at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf
> .readLong(AbstractByteBuf.java:843)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .checkpointState(StreamTask.java:964)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$performCheckpoint$5(StreamTask.java:870)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$430/
> 890021961.run(Unknown Source)
>     at org.apache.flink.streaming.runtime.tasks.
> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(
> StreamTaskActionExecutor.java:94)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .performCheckpoint(StreamTask.java:843)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .triggerCheckpoint(StreamTask.java:776)
>     ... 12 more
> 2020-06-16 23:27:46,051 INFO org.apache.flink.runtime.taskmanager.Task -
> Triggering cancellation of task code Source: KafkaTableSource(id,
> order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id,
> item_name, item_count, origin_single_item_amount, pay_amount, tax_amount,
> item_actual_amount, customer_amount, sharing_amount, logistic_type,
> logistic_pay_type, logistic_amount, attribute, spu_feature, spec,
> spec_desc, item_picture, promotion_attr, order_status, refund_status,
> biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time,
> stock_out_time, delivery_time, end_time, is_deleted, creator, editor,
> create_time, edit_time, _change_column, _old_column, _ddl_field,
> _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS
> the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE
> '"pay_time"')]) (2/2) (65d0693dbe12af0d571601dfafd0ca09).
> 2020-06-16 23:27:46,036 INFO org.apache.flink.runtime.taskmanager.Task -
> Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[((edit_time SUBSTRING 0 SUBSTRING 10)
> CONCAT _UTF-16LE':' CONCAT _UTF-16LE'rt:trace' CONCAT _UTF-16LE':' CONCAT
> _UTF-16LE'item_ids_ordered' CONCAT _UTF-16LE':' CONCAT shop_id) AS
> setKey, item_id AS setValue], where=[(_change_column jsonHasKey _UTF-16LE
> '"pay_time"')]) -> SinkConversionToTuple2 -> Sink: Unnamed (1/2) (5c29783b8f7ed8bfb1a7723f5c4216b1)
> switched from RUNNING to FAILED.
> java.lang.Exception: Could not perform checkpoint 329 for operator Source:
> KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[((edit_time SUBSTRING 0 SUBSTRING 10)
> CONCAT _UTF-16LE':' CONCAT _UTF-16LE'rt:trace' CONCAT _UTF-16LE':' CONCAT
> _UTF-16LE'item_ids_ordered' CONCAT _UTF-16LE':' CONCAT shop_id) AS
> setKey, item_id AS setValue], where=[(_change_column jsonHasKey _UTF-16LE
> '"pay_time"')]) -> SinkConversionToTuple2 -> Sink: Unnamed (1/2).
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .triggerCheckpoint(StreamTask.java:785)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$triggerCheckpointAsync$3(StreamTask.java:760)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$429/
> 1681613747.call(Unknown Source)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at org.apache.flink.streaming.runtime.tasks.
> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(
> StreamTaskActionExecutor.java:87)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail
> .java:78)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .processMail(MailboxProcessor.java:261)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:186)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:485)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:469)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .checkpointState(StreamTask.java:964)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$performCheckpoint$5(StreamTask.java:870)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$430/
> 890021961.run(Unknown Source)
>     at org.apache.flink.streaming.runtime.tasks.
> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(
> StreamTaskActionExecutor.java:94)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .performCheckpoint(StreamTask.java:843)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .triggerCheckpoint(StreamTask.java:776)
>     ... 12 more
>
> 求指点啊.
>
>
>
>