You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Vicky Papavasileiou (Jira)" <ji...@apache.org> on 2023/02/07 15:44:00 UTC

[jira] [Created] (FLINK-30945) FTS does not support multiple writers into the same table and topic

Vicky Papavasileiou created FLINK-30945:
-------------------------------------------

             Summary: FTS does not support multiple writers into the same table and topic
                 Key: FLINK-30945
                 URL: https://issues.apache.org/jira/browse/FLINK-30945
             Project: Flink
          Issue Type: Bug
          Components: Table Store
            Reporter: Vicky Papavasileiou


When creating two different streaming jobs that INSERT INTO the same table and kafka topic, the second job is never able to make progress as the transaction gets constantly aborted due to the producer getting fenced.

FTS should set the transactionalIdPrefix to avoid transactions of different jobs clashing.
{code:java}
2023-02-06 17:13:36,088 WARN org.apache.flink.runtime.taskmanager.Task [] - Writer -> Global Committer -> Sink: end (1/1)#0 (8cf4197af9716623c3c19e7fa3d7c071_b5c8d46f3e7b141acf271f12622e752b_0_0) switched from RUNNING to FAILED with failure cause: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:323) at org.apache.flink.table.store.connector.sink.StoreWriteOperator.notifyCheckpointComplete(StoreWriteOperator.java:175) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:479) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:413) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1412) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$15(StreamTask.java:1353) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$18(StreamTask.java:1392) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.flink.table.store.shaded.org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one. {code}
Sample queries:
 
 
{code:java}
CREATE CATALOG table_store_catalog WITH (
    'type'='table-store',
    'warehouse'='s3://my-bucket/table-store'
 );
USE CATALOG table_store_catalog;
SET 'execution.checkpointing.interval' = '10 s';
CREATE TABLE word_count_kafka (
     word STRING PRIMARY KEY NOT ENFORCED,
     cnt BIGINT
 ) WITH (
     'log.system' = 'kafka',
     'kafka.bootstrap.servers' = 'broker:9092',
     'kafka.topic' = 'word_count_log'
 );
CREATE TEMPORARY TABLE word_table (
     word STRING
 ) WITH (
     'connector' = 'datagen',
     'fields.word.length' = '1'
 );
{code}
 

And the two INSERT jobs:
{code:java}
INSERT INTO word_count_kafka SELECT word, COUNT(*) FROM word_table GROUP BY word;{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)