You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Hanlin Liu (JIRA)" <ji...@apache.org> on 2018/02/27 07:55:00 UTC

[jira] [Updated] (KAFKA-6595) Kafka connect commit offset incorrectly.

     [ https://issues.apache.org/jira/browse/KAFKA-6595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hanlin Liu updated KAFKA-6595:
------------------------------
    Description: 
Version: ConfluentPlatform Kafka 3.2.0

SourceTaskOffsetComitter calls commitOffset() and waits for all incomplete records to be sent. While the task is stopped, commitOffset() is called again by the final block in WorkerSourceTask.execute(), it will throw {{Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this}} exception. This will trigger closing Producer without waiting the flush timeout.

After 30 seconds, all incomplete records has been forcefully aborted. If the {{offset.flush.timeout.ms}} is configured larger than 30 seconds, WorkerSourceTask will consider those aborted records as sent within flush timeout, which results in incorrectly flushing the source offset.

 
{code:java}
// code placeholder

2018-02-27 02:59:33,134 INFO  [] Stopping connector dp-sqlserver-connector-dptask_455   [pool-3-thread-6][Worker.java:254]

2018-02-27 02:59:33,134 INFO  [] Stopped connector dp-sqlserver-connector-dptask_455   [pool-3-thread-6][Worker.java:264]



2018-02-27 02:59:34,121 ERROR [] Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this   [pool-1-thread-13][OffsetStorageWriter.java:110]

2018-02-27 02:59:34,121 ERROR [] Task dp-sqlserver-connector-dptask_455-0 threw an uncaught and unrecoverable exception   [pool-1-thread-13][WorkerTask.java:141]

org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing

        at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:112)

        at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:294)

        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:177)

        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)

        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)

        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

2018-02-27 03:00:00,734 ERROR [] Graceful stop of task dp-sqlserver-connector-dptask_455-0 failed.   [pool-3-thread-1][Worker.java:405]

2018-02-27 03:00:04,126 INFO  [] Proceeding to force close the producer since pending requests could not be completed within timeout 30 ms.   [pool-1-thread-13][KafkaProducer.java:713]

2018-02-27 03:00:04,127 ERROR [] dp-sqlserver-connector-dptask_455-0 failed to send record to dptask_455.JF_TEST_11.jf_test_tab_8: {}   [kafka-producer-network-thread | producer-31][WorkerSourceTask.java:228]

java.lang.IllegalStateException: Producer is closed forcefully.

        at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:522)

        at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:502)

        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:147)

        at java.lang.Thread.run(Thread.java:745)

2018-02-27 03:00:09,920 INFO  [] Finished WorkerSourceTask{id=dp-sqlserver-connector-dptask_455-0} commitOffsets successfully in 47088 ms   [pool-4-thread-1][WorkerSourceTask.java:371]
{code}
 

 

  was:
SourceTaskOffsetComitter calls commitOffset() and waits for all incomplete records to be sent. While the task is stopped, commitOffset() is called again by the final block in WorkerSourceTask.execute(), it will throw {{Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this}} exception. This will trigger closing Producer without waiting the flush timeout.

After 30 seconds, all incomplete records has been forcefully aborted. If the {{offset.flush.timeout.ms}} is configured larger than 30 seconds, WorkerSourceTask will consider those aborted records as sent within flush timeout, which results in incorrectly flushing the source offset.

 
{code:java}
// code placeholder

2018-02-27 02:59:33,134 INFO  [] Stopping connector dp-sqlserver-connector-dptask_455   [pool-3-thread-6][Worker.java:254]

2018-02-27 02:59:33,134 INFO  [] Stopped connector dp-sqlserver-connector-dptask_455   [pool-3-thread-6][Worker.java:264]



2018-02-27 02:59:34,121 ERROR [] Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this   [pool-1-thread-13][OffsetStorageWriter.java:110]

2018-02-27 02:59:34,121 ERROR [] Task dp-sqlserver-connector-dptask_455-0 threw an uncaught and unrecoverable exception   [pool-1-thread-13][WorkerTask.java:141]

org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing

        at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:112)

        at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:294)

        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:177)

        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)

        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)

        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

2018-02-27 03:00:00,734 ERROR [] Graceful stop of task dp-sqlserver-connector-dptask_455-0 failed.   [pool-3-thread-1][Worker.java:405]

2018-02-27 03:00:04,126 INFO  [] Proceeding to force close the producer since pending requests could not be completed within timeout 30 ms.   [pool-1-thread-13][KafkaProducer.java:713]

2018-02-27 03:00:04,127 ERROR [] dp-sqlserver-connector-dptask_455-0 failed to send record to dptask_455.JF_TEST_11.jf_test_tab_8: {}   [kafka-producer-network-thread | producer-31][WorkerSourceTask.java:228]

java.lang.IllegalStateException: Producer is closed forcefully.

        at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:522)

        at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:502)

        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:147)

        at java.lang.Thread.run(Thread.java:745)

2018-02-27 03:00:09,920 INFO  [] Finished WorkerSourceTask{id=dp-sqlserver-connector-dptask_455-0} commitOffsets successfully in 47088 ms   [pool-4-thread-1][WorkerSourceTask.java:371]
{code}
 

 


> Kafka connect commit offset incorrectly.
> ----------------------------------------
>
>                 Key: KAFKA-6595
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6595
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 0.10.2.0
>            Reporter: Hanlin Liu
>            Priority: Major
>
> Version: ConfluentPlatform Kafka 3.2.0
> SourceTaskOffsetComitter calls commitOffset() and waits for all incomplete records to be sent. While the task is stopped, commitOffset() is called again by the final block in WorkerSourceTask.execute(), it will throw {{Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this}} exception. This will trigger closing Producer without waiting the flush timeout.
> After 30 seconds, all incomplete records has been forcefully aborted. If the {{offset.flush.timeout.ms}} is configured larger than 30 seconds, WorkerSourceTask will consider those aborted records as sent within flush timeout, which results in incorrectly flushing the source offset.
>  
> {code:java}
> // code placeholder
> 2018-02-27 02:59:33,134 INFO  [] Stopping connector dp-sqlserver-connector-dptask_455   [pool-3-thread-6][Worker.java:254]
> 2018-02-27 02:59:33,134 INFO  [] Stopped connector dp-sqlserver-connector-dptask_455   [pool-3-thread-6][Worker.java:264]
> 2018-02-27 02:59:34,121 ERROR [] Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this   [pool-1-thread-13][OffsetStorageWriter.java:110]
> 2018-02-27 02:59:34,121 ERROR [] Task dp-sqlserver-connector-dptask_455-0 threw an uncaught and unrecoverable exception   [pool-1-thread-13][WorkerTask.java:141]
> org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
>         at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:112)
>         at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:294)
>         at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:177)
>         at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
>         at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> 2018-02-27 03:00:00,734 ERROR [] Graceful stop of task dp-sqlserver-connector-dptask_455-0 failed.   [pool-3-thread-1][Worker.java:405]
> 2018-02-27 03:00:04,126 INFO  [] Proceeding to force close the producer since pending requests could not be completed within timeout 30 ms.   [pool-1-thread-13][KafkaProducer.java:713]
> 2018-02-27 03:00:04,127 ERROR [] dp-sqlserver-connector-dptask_455-0 failed to send record to dptask_455.JF_TEST_11.jf_test_tab_8: {}   [kafka-producer-network-thread | producer-31][WorkerSourceTask.java:228]
> java.lang.IllegalStateException: Producer is closed forcefully.
>         at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:522)
>         at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:502)
>         at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:147)
>         at java.lang.Thread.run(Thread.java:745)
> 2018-02-27 03:00:09,920 INFO  [] Finished WorkerSourceTask{id=dp-sqlserver-connector-dptask_455-0} commitOffsets successfully in 47088 ms   [pool-4-thread-1][WorkerSourceTask.java:371]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)