You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "shizhengchao (Jira)" <ji...@apache.org> on 2022/10/27 02:49:00 UTC

[jira] [Commented] (FLINK-29772) Kafka table source scan blocked

    [ https://issues.apache.org/jira/browse/FLINK-29772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17624834#comment-17624834 ] 

shizhengchao commented on FLINK-29772:
--------------------------------------

kafka interval join, deadlock occurs when using rocksdb

> Kafka table source scan blocked
> -------------------------------
>
>                 Key: FLINK-29772
>                 URL: https://issues.apache.org/jira/browse/FLINK-29772
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.13.2
>            Reporter: shizhengchao
>            Priority: Major
>
> {code:java}
> //
> "Source: TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, watermark=[-(toTimeStamps($2), 10000:INTERVAL SECOND)]]], fields=[data_type, cluster_name, server_time, server_time_s, client_time, client_time_s, imei, request_id, owner_id, service_id, content_id, sign_id, receiver_type, msg_type, handle_type, reach_type, source_type, create_time, msg_id, imsi, array_info_imei, phone, channel_id, process_time, code, msg, receiver, content_type, android_version, apk_version]) -> Calc(select=[data_type, server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', _UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER SET "UTF-16LE") AND server_time IS NOT NULL), server_time, CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 10))))) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER SET "UTF-16LE") AND client_time IS NOT NULL), client_time, CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 10))))) AS client_time, IF(((data_type = _UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER SET "UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=77 BLOCKED on java.lang.Object@4aa3fe44 owned by "Legacy Source Thread - Source: TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle, watermark=[-(toTimeStamps($2), 10000:INTERVAL SECOND)]]], fields=[data_type, cluster_name, server_time, server_time_s, client_time, client_time_s, imei, request_id, owner_id, service_id, content_id, sign_id, receiver_type, msg_type, handle_type, reach_type, source_type, create_time, msg_id, imsi, array_info_imei, phone, channel_id, process_time, code, msg, receiver, content_type, android_version, apk_version]) -> Calc(select=[data_type, server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) -> Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#', _UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'), (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER SET "UTF-16LE") AND server_time IS NOT NULL), server_time, CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 10))))) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'), (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER SET "UTF-16LE") AND client_time IS NOT NULL), client_time, CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0, 10))))) AS client_time, IF(((data_type = _UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime], where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19) CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER SET "UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=87
>     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>     -blocked on java.lang.Object@4aa3fe44
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$556/1624913200.run(Unknown Source)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)