You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "linyue li (JIRA)" <ji...@apache.org> on 2018/11/26 03:33:00 UTC
[jira] [Updated] (KAFKA-7672) The local state not fully restored
after KafkaStream rebalanced, resulting in data loss
[ https://issues.apache.org/jira/browse/KAFKA-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
linyue li updated KAFKA-7672:
-----------------------------
Description:
Normally, when a task is mitigated to a new thread and no checkpoint file was found under its task folder, Kafka Stream needs to restore the local state for remote changelog topic completely and then resume running. However, in some scenarios, we found that Kafka Stream *NOT* restore this state even no checkpoint was found, but just clean the state folder and transition to running state directly, resulting the historic data loss.
To be specific, I will give the detailed logs for Kafka Stream in our project to show this scenario:
{quote}2018-10-23 08:27:07,684 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=AuditTrailBatch-StreamThread-4-consumer, groupId=AuditTrailBatch] Revoking previously assigned partitions [audittrailbatch-57]
2018-10-23 08:27:07,684 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [AuditTrailBatch-StreamThread-4] State transition from PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
2018-10-23 08:27:10,856 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=AuditTrailBatch-StreamThread-4-consumer, groupId=AuditTrailBatch] (Re-)joining group
2018-10-23 08:27:53,153 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=AuditTrailBatch-StreamThread-4-consumer, groupId=AuditTrailBatch] Successfully joined group with generation 323
2018-10-23 08:27:53,153 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=AuditTrailBatch-StreamThread-4-consumer, groupId=AuditTrailBatch] Setting newly assigned partitions [AuditTrailBatch-store1-repartition-1]
2018-10-23 08:27:53,153 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [AuditTrailBatch-StreamThread-4] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
2018-10-23 08:27:53,153 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [AuditTrailBatch-StreamThread-4] Creating producer client for task 1_1
2018-10-23 08:27:53,622 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [AuditTrailBatch-StreamThread-4] partition assignment took 469 ms.
2018-10-23 08:27:54,357 INFO org.apache.kafka.streams.processor.internals.StoreChangelogReader - stream-thread [AuditTrailBatch-StreamThread-4] *No checkpoint found for task 1_1 state store AuditTrailBatch-store1-changelog-1 with EOS turned on.* Reinitializing the task and restore its state from the beginning.
2018-10-23 08:27:54,357 INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=AuditTrailBatch-StreamThread-4-restore-consumer, groupId=] *Resetting offset for partition AuditTrailBatch-store1-changelog-1 to offset 0.*
2018-10-23 08:27:54,653 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [AuditTrailBatch-StreamThread-4] *State transition from PARTITIONS_ASSIGNED to RUNNING*
{quote}
was:
Normally, when a task is mitigated to a new thread and no checkpoint file was found under its task folder, Kafka Stream needs to restore the local state for remote changelog topic completely and then resume running. Howerver, in some scenarios, we found that Kafka Stream *NOT* restore this state even no checkpoint was found, but just clean the state folder and transition to running state directly, resulting the historic data loss.
To be specific, I will give the detailed logs for Kafka Stream in our project to show this scenario:
2018-10-23 08:27:07,684 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=AuditTrailBatch-StreamThread-4-consumer, groupId=AuditTrailBatch] Revoking previously assigned partitions [audittrailbatch-57]
2018-10-23 08:27:07,684 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [AuditTrailBatch-StreamThread-4] State transition from PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
2018-10-23 08:27:10,856 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=AuditTrailBatch-StreamThread-4-consumer, groupId=AuditTrailBatch] (Re-)joining group
2018-10-23 08:27:53,153 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=AuditTrailBatch-StreamThread-4-consumer, groupId=AuditTrailBatch] Successfully joined group with generation 323
2018-10-23 08:27:53,153 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=AuditTrailBatch-StreamThread-4-consumer, groupId=AuditTrailBatch] Setting newly assigned partitions [AuditTrailBatch-store1-repartition-1]
2018-10-23 08:27:53,153 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [AuditTrailBatch-StreamThread-4] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
2018-10-23 08:27:53,153 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [AuditTrailBatch-StreamThread-4] Creating producer client for task 1_1
2018-10-23 08:27:53,622 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [AuditTrailBatch-StreamThread-4] partition assignment took 469 ms.
2018-10-23 08:27:54,357 INFO org.apache.kafka.streams.processor.internals.StoreChangelogReader - stream-thread [AuditTrailBatch-StreamThread-4] *No checkpoint found for task 1_1 state store AuditTrailBatch-store1-changelog-1 with EOS turned on.* Reinitializing the task and restore its state from the beginning.
2018-10-23 08:27:54,357 INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=AuditTrailBatch-StreamThread-4-restore-consumer, groupId=] *Resetting offset for partition AuditTrailBatch-store1-changelog-1 to offset 0.*
2018-10-23 08:27:54,653 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [AuditTrailBatch-StreamThread-4] *State transition from PARTITIONS_ASSIGNED to RUNNING*
> The local state not fully restored after KafkaStream rebalanced, resulting in data loss
> ---------------------------------------------------------------------------------------
>
> Key: KAFKA-7672
> URL: https://issues.apache.org/jira/browse/KAFKA-7672
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.1.0
> Reporter: linyue li
> Priority: Major
> Fix For: 2.1.0
>
>
> Normally, when a task is mitigated to a new thread and no checkpoint file was found under its task folder, Kafka Stream needs to restore the local state for remote changelog topic completely and then resume running. However, in some scenarios, we found that Kafka Stream *NOT* restore this state even no checkpoint was found, but just clean the state folder and transition to running state directly, resulting the historic data loss.
> To be specific, I will give the detailed logs for Kafka Stream in our project to show this scenario:
> {quote}2018-10-23 08:27:07,684 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=AuditTrailBatch-StreamThread-4-consumer, groupId=AuditTrailBatch] Revoking previously assigned partitions [audittrailbatch-57]
> 2018-10-23 08:27:07,684 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [AuditTrailBatch-StreamThread-4] State transition from PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
> 2018-10-23 08:27:10,856 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=AuditTrailBatch-StreamThread-4-consumer, groupId=AuditTrailBatch] (Re-)joining group
> 2018-10-23 08:27:53,153 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=AuditTrailBatch-StreamThread-4-consumer, groupId=AuditTrailBatch] Successfully joined group with generation 323
> 2018-10-23 08:27:53,153 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=AuditTrailBatch-StreamThread-4-consumer, groupId=AuditTrailBatch] Setting newly assigned partitions [AuditTrailBatch-store1-repartition-1]
> 2018-10-23 08:27:53,153 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [AuditTrailBatch-StreamThread-4] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> 2018-10-23 08:27:53,153 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [AuditTrailBatch-StreamThread-4] Creating producer client for task 1_1
> 2018-10-23 08:27:53,622 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [AuditTrailBatch-StreamThread-4] partition assignment took 469 ms.
> 2018-10-23 08:27:54,357 INFO org.apache.kafka.streams.processor.internals.StoreChangelogReader - stream-thread [AuditTrailBatch-StreamThread-4] *No checkpoint found for task 1_1 state store AuditTrailBatch-store1-changelog-1 with EOS turned on.* Reinitializing the task and restore its state from the beginning.
> 2018-10-23 08:27:54,357 INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=AuditTrailBatch-StreamThread-4-restore-consumer, groupId=] *Resetting offset for partition AuditTrailBatch-store1-changelog-1 to offset 0.*
> 2018-10-23 08:27:54,653 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [AuditTrailBatch-StreamThread-4] *State transition from PARTITIONS_ASSIGNED to RUNNING*
> {quote}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)