You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "vmaster.cc (Jira)" <ji...@apache.org> on 2021/10/14 05:42:00 UTC
[jira] [Created] (FLINK-24539) ChangelogNormalize operator tooks
too long time to INITIALIZING until failed
vmaster.cc created FLINK-24539:
----------------------------------
Summary: ChangelogNormalize operator tooks too long time to INITIALIZING until failed
Key: FLINK-24539
URL: https://issues.apache.org/jira/browse/FLINK-24539
Project: Flink
Issue Type: Bug
Components: Runtime / Checkpointing, Runtime / Task
Affects Versions: 1.13.1
Environment: Flink version :1.13.1
TaskManager memory:
!image-2021-10-14-13-36-56-899.png|width=578,height=318!
JobManager memory:
!image-2021-10-14-13-37-51-445.png|width=578,height=229!
Reporter: vmaster.cc
Attachments: image-2021-10-14-13-19-08-215.png, image-2021-10-14-13-36-56-899.png, image-2021-10-14-13-37-51-445.png, taskmanager_container_e11_1631768043929_0012_01_000004_log.txt
I'm using debezium to produce cdc from mysql, considering its at least one delivery, so i must set the config 'table.exec.source.cdc-events-duplicate=true'.
But when some unknown case make my task down, flink task restart failed always. I found that ChangelogNormalize operator tooks too long time in INITIALIZING stage.
screenshot and log fragment are as follows:
!image-2021-10-14-13-19-08-215.png|width=567,height=293!
{code:java}
2021-10-14 12:32:33,660 INFO org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - Finished building RocksDB keyed state-backend at /data3/yarn/nm/usercache/flink/appcache/application_1631768043929_0012/flink-io-f31735c3-e726-4c49-89a5-916670809b7a/job_7734977994a6a10f7cc784d50e4a1a34_op_KeyedProcessOperator_dc2290bb6f8f5cd2bd425368843494fe__1_1__uuid_6cbbe6ae-f43e-4d2a-b1fb-f0cb71f257af.2021-10-14 12:32:33,662 INFO org.apache.flink.runtime.taskmanager.Task [] - GroupAggregate(groupBy=[teacher_id, create_day], select=[teacher_id, create_day, SUM_RETRACT($f2) AS teacher_courseware_count]) -> Calc(select=[teacher_id, create_day, CAST(teacher_courseware_count) AS teacher_courseware_count]) -> NotNullEnforcer(fields=[teacher_id, create_day]) (1/1)#143 (9cca3ef1293cc6364698381bbda93998) switched from INITIALIZING to RUNNING.2021-10-14 12:38:07,581 INFO org.apache.flink.runtime.taskmanager.Task [] - Ignoring checkpoint aborted notification for non-running task ChangelogNormalize(key=[c_id]) -> Calc(select=[c_author_id AS teacher_id, DATE_FORMAT(c_create_time, _UTF-16LE'yyyy-MM-dd') AS create_day, IF((c_state = 10), 1, 0) AS $f2], where=[((c_is_group = 0) AND (c_author_id <> _UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"))]) (1/1)#143.2021-10-14 12:38:07,581 INFO org.apache.flink.runtime.taskmanager.Task [] - Attempting to cancel task Sink: Sink(table=[default_catalog.default_database.t_flink_school_teacher_courseware_count], fields=[teacher_id, create_day, teacher_courseware_count]) (2/2)#143 (cc25f9ae49c4db01ab40ff103fae43fd).2021-10-14 12:38:07,581 INFO org.apache.flink.runtime.taskmanager.Task [] - Sink: Sink(table=[default_catalog.default_database.t_flink_school_teacher_courseware_count], fields=[teacher_id, create_day, teacher_courseware_count]) (2/2)#143 (cc25f9ae49c4db01ab40ff103fae43fd) switched from RUNNING to CANCELING.2021-10-14 12:38:07,581 INFO org.apache.flink.runtime.taskmanager.Task [] - Triggering cancellation of task code Sink: Sink(table=[default_catalog.default_database.t_flink_school_teacher_courseware_count], fields=[teacher_id, create_day, teacher_courseware_count]) (2/2)#143 (cc25f9ae49c4db01ab40ff103fae43fd).2021-10-14 12:38:07,583 INFO org.apache.flink.runtime.taskmanager.Task [] - Attempting to cancel task Sink: Sink(table=[default_catalog.default_database.t_flink_school_teacher_courseware_count], fields=[teacher_id, create_day, teacher_courseware_count]) (1/2)#143 (5419f41a3f0cc6c2f3f4c82c87f4ae22).2021-10-14 12:38:07,583 INFO org.apache.flink.runtime.taskmanager.Task [] - Sink: Sink(table=[default_catalog.default_database.t_flink_school_teacher_courseware_count], fields=[teacher_id, create_day, teacher_courseware_count]) (1/2)#143 (5419f41a3f0cc6c2f3f4c82c87f4ae22) switched from RUNNING to CANCELING.
{code}
attention:
1、The table has a large amount of data, up to 500 million.
2、Because the amount of data is very large, the rocksdb state backend is used
3、More other env infos ,see next section and the full log see attachment.
{code:java}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)