You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Chris Egerton (Jira)" <ji...@apache.org> on 2023/03/09 15:50:00 UTC

[jira] [Commented] (KAFKA-14799) Source tasks fail if connector attempts to abort empty transaction

    [ https://issues.apache.org/jira/browse/KAFKA-14799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17698469#comment-17698469 ] 

Chris Egerton commented on KAFKA-14799:
---------------------------------------

Right now I'm leaning toward option 1, probably with a {{{}WARN{}}}-level log message.

> Source tasks fail if connector attempts to abort empty transaction
> ------------------------------------------------------------------
>
>                 Key: KAFKA-14799
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14799
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>            Reporter: Chris Egerton
>            Assignee: Chris Egerton
>            Priority: Major
>
> If a source task invokes [TransactionContext::abortTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/TransactionContext.html#abortTransaction()] while the current transaction is empty, and then returns an empty batch of records from the next (or current) invocation of {{{}SourceTask::poll{}}}, the task will fail.
> This is because the Connect framework will honor the transaction abort request by invoking [KafkaProducer::abortTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#abortTransaction()], but without having first invoked [KafkaProducer::beginTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#beginTransaction()] (since no records had been received from the task), which leads to an {{{}IllegalStateException{}}}.
> An example stack trace for this scenario:
> {quote}[2023-03-09 10:41:25,053] ERROR [exactlyOnceQuestionMark|task-0] ExactlyOnceWorkerSourceTask\{id=exactlyOnceQuestionMark-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:210)
> java.lang.IllegalStateException: TransactionalId exactly-once-source-integration-test-exactlyOnceQuestionMark-0: Invalid transition attempted from state READY to state ABORTING_TRANSACTION
>     at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
>     at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:967)
>     at org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginAbort$3(TransactionManager.java:269)
>     at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1116)
>     at org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:266)
>     at org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:835)
>     at org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$3.abortTransaction(ExactlyOnceWorkerSourceTask.java:495)
>     at org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$3.shouldCommitTransactionForBatch(ExactlyOnceWorkerSourceTask.java:473)
>     at org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$TransactionBoundaryManager.maybeCommitTransactionForBatch(ExactlyOnceWorkerSourceTask.java:398)
>     at org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask.batchDispatched(ExactlyOnceWorkerSourceTask.java:186)
>     at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:362)
>     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
>     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
>     at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
>     at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
>     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>     at java.base/java.lang.Thread.run(Thread.java:829)
> {quote}
>  
> As far as a fix goes, we have a few options:
>  # Gracefully handle this case by translating the call to {{TransactionContext::abortTransaction}} into a no-op
>  # Throw an exception (probably an {{{}IllegalStateException{}}}) from {{{}TransactionContext::abortTransaction{}}}, which may fail the task, but would give it the option to swallow the exception and continue processing if it would like
>  # Forcibly fail the task without giving it the chance to swallow an exception, using a similar strategy to how we fail tasks that request that a transaction be committed and aborted for the same record (see [here|https://github.com/apache/kafka/blob/c5240c0390892fe9ecbe5285185c370e7be8b2aa/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTransactionContext.java#L78-L86])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)