You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "shizhengchao (Jira)" <ji...@apache.org> on 2021/12/01 07:05:00 UTC

[jira] [Created] (FLINK-25124) A deadlock occurs when the jdbc sink uses two consecutive dimension tables to associate

shizhengchao created FLINK-25124:
------------------------------------

             Summary: A deadlock occurs when the jdbc sink uses two consecutive dimension tables to associate
                 Key: FLINK-25124
                 URL: https://issues.apache.org/jira/browse/FLINK-25124
             Project: Flink
          Issue Type: Bug
          Components: Connectors / JDBC
    Affects Versions: 1.13.1
            Reporter: shizhengchao


 

The sql statement is as follows:
{code:java}
//代码占位符
INSERT INTO imei_phone_domestic_realtime
  SELECT
  t.data.imei AS imei,
  CAST(t.data.register_date_key AS bigint) AS register_date_key,
  c.agent_type AS channel_name,
  c.agent_short_name,
  c.agent_name,
  c.agent_chinese_name,
  c.isforeign AS agent_market_type,
  p.seriename AS series_name,
  p.salename AS sale_name,
  p.devname AS dev_name,
  p.devnamesource AS dev_name_source,
  p.color,
  p.isforeign AS product_market_type,
  p.carrier,
  p.lcname AS life_cycle,
  IFNULL(p.shipping_price,0) AS shipping_price,
  IFNULL(p.retail_price,0) AS  retail_price
  FROM kafka_imei_phone_domestic_realtime AS t
  LEFT JOIN dim_product FOR SYSTEM_TIME AS OF t.proctime AS p ON p.pn=t.item_code
  LEFT JOIN dim_customer FOR SYSTEM_TIME AS OF t.proctime AS c ON c.customer_code=t.customer_code
  where t.eventType='update'; {code}
There will be a probability of deadlock:
{code:java}
//代码占位符
"jdbc-upsert-output-format-thread-1" Id=84 BLOCKED on org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat@649788af owned by "Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, kafka_imei_phone_domestic_realtime]], fields=[data, eventType]) -> Calc(select=[data, data.item_code AS $f3], where=[(eventType = _UTF-16LE'update':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> LookupJoin(table=[default_catalog.default_database.dim_product], joinType=[LeftOuterJoin], async=[false], lookup=[pn=$f3], select=[data, $f3, pn, color, isforeign, devname, salename, seriename, lcname, carrier, devnamesource, shipping_price, retail_price]) -> Calc(select=[data, color, isforeign, devname, salename, seriename, lcname, carrier, devnamesource, shipping_price, retail_price, data.customer_code AS $f31]) -> LookupJoin(table=[default_catalog.default_database.dim_customer], joinType=[LeftOuterJoin], async=[false], lookup=[customer_code=$f31], select=[data, color, isforeign, devname, salename, seriename, lcname, carrier, devnamesource, shipping_price, retail_price, $f31, customer_code, agent_short_name, agent_name, isforeign, agent_type, agent_chinese_name]) -> Calc(select=[data.imei AS imei, CAST(data.register_date_key) AS register_date_key, agent_type AS channel_name, agent_short_name, agent_name, agent_chinese_name, isforeign0 AS agent_market_type, seriename AS series_name, salename AS sale_name, devname AS dev_name, devnamesource AS dev_name_source, color, isforeign AS product_market_type, carrier, lcname AS life_cycle, IFNULL(shipping_price, 0:DECIMAL(10, 0)) AS shipping_price, IFNULL(retail_price, 0:DECIMAL(10, 0)) AS retail_price]) -> NotNullEnforcer(fields=[imei]) -> Sink: Sink(table=[default_catalog.default_database.imei_phone_domestic_realtime], fields=[imei, register_date_key, channel_name, agent_short_name, agent_name, agent_chinese_name, agent_market_type, series_name, sale_name, dev_name, dev_name_source, color, product_market_type, carrier, life_cycle, shipping_price, retail_price]) (6/12)#0" Id=82
    at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.lambda$open$0(JdbcBatchingOutputFormat.java:124)
    -  blocked on org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat@649788af
    at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat$$Lambda$344/21845506.run(Unknown Source)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ...    Number of locked synchronizers = 1
    - java.util.concurrent.ThreadPoolExecutor$Worker@325612a2 {code}
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)