You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Avinash (Jira)" <ji...@apache.org> on 2022/11/08 08:47:00 UTC

[jira] [Updated] (FLINK-29926) File source continuous monitoring mode ignoring files during savepoint upgrade mode

     [ https://issues.apache.org/jira/browse/FLINK-29926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Avinash updated FLINK-29926:
----------------------------
    Description: 
During a stateful application upgarde using flink kubernetes operator, the StreamExecutionEnvironment.readFile() with FileProcessingMode.PROCESS_CONTINUOUSLY mode operator fails to detect any new changes that has happened on the same file in the directory.
 
*Background* : Currently we have a fresh deployment of the application using kuberenetes operator using savepoint as the upgarde mode and checkpoint enabled. env.readFile() with FileProcessingMode.PROCESS_CONTINUOUSLY mode operator starts continuosly monitoring the directory (S3 prefix) for any changes and also checkpoints for the provided duration.
{noformat}
2022-11-07 10:47:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Opened ContinuousFileMonitoringFunction (taskIdx= 0) for path: s3://test-app/configs
...
...
2022-11-07 10:47:13.996 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy Source Thread - Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 1667817365000 and global mod time= 1667817365000
...
...
2022-11-07 10:51:40.896 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - ContinuousFileMonitoringFunction checkpointed 1667817365000.{noformat}
Now we try to upgarde the application using the kubernetes operator, due to this the application tries to take savepoint by using the below Suspend Mechanism - Cancel with savepoint.
By doing this, the application calls the cancel methods which inturn sets the globalModificationTime = Long.MAX_VALUE and then the savepoint is taken.
{noformat}
2022-11-07 10:54:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy Source Thread - Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 1667817365000 and global mod time= 1667817365000
...
2022-11-07 10:55:12.899 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - ContinuousFileMonitoringFunction checkpointed 9223372036854775807
....
2022-11-07 10:55:13.090 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Closed File Monitoring Source for path: s3://test-app/{noformat}
Due to this, the globalModificationTime changed from 1667817365000 to MAX_VALUE (9223372036854775807) and gets stored in the savepoint stateOnce the application restarts with the new changes, the env.readFile() operator restores the previous state in which the globalModificationTime = Long.MAX_VALUE and starts ignoring any changes done to the file after upgrade
{noformat}
2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='INFO' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Restoring state for the ContinuousFileMonitoringFunction
....
2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - ContinuousFileMonitoringFunction retrieved a global mod time of 9223372036854775807
....
2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Opened ContinuousFileMonitoringFunction (taskIdx= 0) for path: s3://test-app/configs
....
2022-11-07 11:00:13.996 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy Source Thread - Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 1667821399000 and global mod time= 9223372036854775807
...
...
2022-11-07 11:01:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy Source Thread - Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 1667821399000 and global mod time= 9223372036854775807
...
2022-11-07 11:02:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy Source Thread - Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 1667821399000 and global mod time= 9223372036854775807{noformat}
Cause : The above issue seems to be due the reassignment of the globalModificationTime to MAX_VALUE during cancel
[https://github.com/apache/flink/blob/a77747892b1724fa5ec388c2b0fe519db32664e9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java#L389]
 
 

  was:
During a stateful application upgarde using flink kubernetes operator, the StreamExecutionEnvironment.readFile() with FileProcessingMode.PROCESS_CONTINUOUSLY mode operator fails to detect any new changes that has happened on the same file in the directory.
 
*Background* : Currently we have a fresh deployment of the application using kuberenetes operator using savepoint as the upgarde mode and checkpoint enabled. env.readFile() with FileProcessingMode.PROCESS_CONTINUOUSLY mode operator starts continuosly monitoring the directory (S3 prefix) for any changes and also checkpoints for the provided duration.
{noformat}
2022-11-07 10:47:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Opened ContinuousFileMonitoringFunction (taskIdx= 0) for path: s3://test-app/configs
...
...
2022-11-07 10:47:13.996 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy Source Thread - Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 1667817365000 and global mod time= 1667817365000
...
...
2022-11-07 10:51:40.896 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - ContinuousFileMonitoringFunction checkpointed 1667817365000.{noformat}
Now we try to upgarde the application using the kubernetes operator, due to this the application tries to take savepoint by using the below Suspend Mechanism - Cancel with savepoint.
By doing this, the application calls the cancel methods which inturn sets the globalModificationTime = Long.MAX_VALUE and then the savepoint is taken.
{noformat}
2022-11-07 10:54:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy Source Thread - Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 1667817365000 and global mod time= 1667817365000
...
2022-11-07 10:55:12.899 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - ContinuousFileMonitoringFunction checkpointed 9223372036854775807
....
2022-11-07 10:55:13.090 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Closed File Monitoring Source for path: s3://test-app/{noformat}
Due to this, the globalModificationTime changed from 1667817365000 to MAX_VALUE (9223372036854775807) and gets stored in the savepoint stateOnce the application restarts with the new changes, the env.readFile() operator restores the previous state in which the globalModificationTime = Long.MAX_VALUE and starts ignoring any changes done to the file after upgrade
{noformat}
2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='INFO' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Restoring state for the ContinuousFileMonitoringFunction
....
2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - ContinuousFileMonitoringFunction retrieved a global mod time of 9223372036854775807
....
2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Opened ContinuousFileMonitoringFunction (taskIdx= 0) for path: s3://test-app/configs
....
2022-11-07 11:00:13.996 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy Source Thread - Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 1667821399000 and global mod time= 9223372036854775807
...
...
2022-11-07 11:01:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy Source Thread - Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 1667821399000 and global mod time= 9223372036854775807
...
2022-11-07 11:02:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy Source Thread - Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 1667821399000 and global mod time= 9223372036854775807{noformat}
Cause : The above seems to be due the reassignment of the globalModificationTime to MAX_VALUE during cancel
https://github.com/apache/flink/blob/a77747892b1724fa5ec388c2b0fe519db32664e9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java#L389
 
 


> File source continuous monitoring mode ignoring files during savepoint upgrade mode
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-29926
>                 URL: https://issues.apache.org/jira/browse/FLINK-29926
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Avinash
>            Priority: Critical
>              Labels: Flink, ReadFile
>
> During a stateful application upgarde using flink kubernetes operator, the StreamExecutionEnvironment.readFile() with FileProcessingMode.PROCESS_CONTINUOUSLY mode operator fails to detect any new changes that has happened on the same file in the directory.
>  
> *Background* : Currently we have a fresh deployment of the application using kuberenetes operator using savepoint as the upgarde mode and checkpoint enabled. env.readFile() with FileProcessingMode.PROCESS_CONTINUOUSLY mode operator starts continuosly monitoring the directory (S3 prefix) for any changes and also checkpoints for the provided duration.
> {noformat}
> 2022-11-07 10:47:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Opened ContinuousFileMonitoringFunction (taskIdx= 0) for path: s3://test-app/configs
> ...
> ...
> 2022-11-07 10:47:13.996 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy Source Thread - Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 1667817365000 and global mod time= 1667817365000
> ...
> ...
> 2022-11-07 10:51:40.896 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - ContinuousFileMonitoringFunction checkpointed 1667817365000.{noformat}
> Now we try to upgarde the application using the kubernetes operator, due to this the application tries to take savepoint by using the below Suspend Mechanism - Cancel with savepoint.
> By doing this, the application calls the cancel methods which inturn sets the globalModificationTime = Long.MAX_VALUE and then the savepoint is taken.
> {noformat}
> 2022-11-07 10:54:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy Source Thread - Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 1667817365000 and global mod time= 1667817365000
> ...
> 2022-11-07 10:55:12.899 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - ContinuousFileMonitoringFunction checkpointed 9223372036854775807
> ....
> 2022-11-07 10:55:13.090 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Closed File Monitoring Source for path: s3://test-app/{noformat}
> Due to this, the globalModificationTime changed from 1667817365000 to MAX_VALUE (9223372036854775807) and gets stored in the savepoint stateOnce the application restarts with the new changes, the env.readFile() operator restores the previous state in which the globalModificationTime = Long.MAX_VALUE and starts ignoring any changes done to the file after upgrade
> {noformat}
> 2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='INFO' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Restoring state for the ContinuousFileMonitoringFunction
> ....
> 2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - ContinuousFileMonitoringFunction retrieved a global mod time of 9223372036854775807
> ....
> 2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Opened ContinuousFileMonitoringFunction (taskIdx= 0) for path: s3://test-app/configs
> ....
> 2022-11-07 11:00:13.996 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy Source Thread - Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 1667821399000 and global mod time= 9223372036854775807
> ...
> ...
> 2022-11-07 11:01:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy Source Thread - Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 1667821399000 and global mod time= 9223372036854775807
> ...
> 2022-11-07 11:02:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy Source Thread - Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 1667821399000 and global mod time= 9223372036854775807{noformat}
> Cause : The above issue seems to be due the reassignment of the globalModificationTime to MAX_VALUE during cancel
> [https://github.com/apache/flink/blob/a77747892b1724fa5ec388c2b0fe519db32664e9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java#L389]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)