You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/08/20 09:42:16 UTC

[GitHub] [flink] lvhuyen opened a new pull request #6613: [FLINK-9940] [API/DataStream][File source] out-of-order files were missed in continuous monitoring

lvhuyen opened a new pull request #6613:
URL: https://github.com/apache/flink/pull/6613


   [FLINK-9940] Fix - File-source continuous monitoring mode - out-of-order files were missed
   
   ## Fix the issue with ContinuousFileMonitoringFunction - out-of-order files were missed in continuous directory scanning mode.
   
   - _Cause_: In the existing directory monitoring mechanism, Flink was maintaining the maximum last-modified-timestamp of all identified files (_globalModificationTime_) so that in the next scan, all files with last-modified-timestamp equal or earlier than that _globalModificationTime_ will be ignored.
   
   
   - _Fix_: This fix provides an additional param when creating a ContinuousFileMonitoringFunction: readConsistencyOffset. Every scan now starts from that max last-modified-timestamp minus this offset. A new list of processedFiles is also maintained, which consists of all known files having modTimestamp in that offset period.
   - For testing this fix, a change to flink-fs-tests has also been made: The collection of seenFiles is changed from a TreeSet to a SortedList. This change is to verify the ExactOnce of file scanning, instead of AtLeastOnce.
   
   ## Verifying this change
   This change is already covered by existing tests with slight update.
   - ContinuousFileProcessingMigrationTest.testMonitoringSourceRestore.
   - ContinuousFileProcessingTest.{testFunctionRestore, testProcessContinuously}
   This change also added test: 
   
   - ContinuousFileProcessingTest.testProcessContinuouslyWithNoteTooLateFile
   
   ## Does this pull request potentially affect one of the following parts:
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): yes (per-file). This is expected to have minimal impact.
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? JavaDocs
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lvhuyen commented on pull request #6613: [FLINK-9940] [API/DataStream][File source] out-of-order files were missed in continuous monitoring

Posted by GitBox <gi...@apache.org>.
lvhuyen commented on pull request #6613:
URL: https://github.com/apache/flink/pull/6613#issuecomment-721013368


   Hi @guoweiM 
   Once again, sorry for the late response.
   Regarding the use of both `globalModificationTime` and `maxProcessedTime`, I think the purpose was to avoid recalculating one from the other at every `shouldIgnore()` check. As this `ContinuousFileMonitoringFunction` class was designed as a single-threaded operator thus having two mutable variables should be fine.
   Please advice.
   Thanks again.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #6613: [FLINK-9940] [API/DataStream][File source] out-of-order files were missed in continuous monitoring

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #6613:
URL: https://github.com/apache/flink/pull/6613#issuecomment-526964862


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "85434111363874caddb161d5f49f22711397ba55",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/125372744",
       "triggerID" : "85434111363874caddb161d5f49f22711397ba55",
       "triggerType" : "PUSH"
     }, {
       "hash" : "85434111363874caddb161d5f49f22711397ba55",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/125372744",
       "triggerID" : "526964862",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "b5aad1232ad0cc45e45299dc8480ed57f9ddfa55",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6259",
       "triggerID" : "b5aad1232ad0cc45e45299dc8480ed57f9ddfa55",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4d48ab513b2b38147013f2b4b824493188fb9269",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4d48ab513b2b38147013f2b4b824493188fb9269",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 85434111363874caddb161d5f49f22711397ba55 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/125372744) 
   * b5aad1232ad0cc45e45299dc8480ed57f9ddfa55 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6259) 
   * 4d48ab513b2b38147013f2b4b824493188fb9269 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lvhuyen commented on pull request #6613: [FLINK-9940] [API/DataStream][File source] out-of-order files were missed in continuous monitoring

Posted by GitBox <gi...@apache.org>.
lvhuyen commented on pull request #6613:
URL: https://github.com/apache/flink/pull/6613#issuecomment-704578210


   Thanks @guoweiM.
   Please give me a bit more time to arrange my time working on this PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] guoweiM closed pull request #6613: [FLINK-9940] [API/DataStream][File source] out-of-order files were missed in continuous monitoring

Posted by GitBox <gi...@apache.org>.
guoweiM closed pull request #6613:
URL: https://github.com/apache/flink/pull/6613


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lvhuyen commented on pull request #6613: [FLINK-9940] [API/DataStream][File source] out-of-order files were missed in continuous monitoring

Posted by GitBox <gi...@apache.org>.
lvhuyen commented on pull request #6613:
URL: https://github.com/apache/flink/pull/6613#issuecomment-778756649


   Thank you @guoweiM 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #6613: [FLINK-9940] [API/DataStream][File source] out-of-order files were missed in continuous monitoring

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #6613:
URL: https://github.com/apache/flink/pull/6613#issuecomment-526964862


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "85434111363874caddb161d5f49f22711397ba55",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/125372744",
       "triggerID" : "85434111363874caddb161d5f49f22711397ba55",
       "triggerType" : "PUSH"
     }, {
       "hash" : "85434111363874caddb161d5f49f22711397ba55",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/125372744",
       "triggerID" : "526964862",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "b5aad1232ad0cc45e45299dc8480ed57f9ddfa55",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "b5aad1232ad0cc45e45299dc8480ed57f9ddfa55",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 85434111363874caddb161d5f49f22711397ba55 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/125372744) 
   * b5aad1232ad0cc45e45299dc8480ed57f9ddfa55 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lvhuyen commented on a change in pull request #6613: [FLINK-9940] [API/DataStream][File source] out-of-order files were missed in continuous monitoring

Posted by GitBox <gi...@apache.org>.
lvhuyen commented on a change in pull request #6613:
URL: https://github.com/apache/flink/pull/6613#discussion_r484131667



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -93,20 +102,45 @@
 	/** Which new data to process (see {@link FileProcessingMode}. */
 	private final FileProcessingMode watchType;
 
-	/** The maximum file modification time seen so far. */
+	/** The offset interval back from the latest file modification timestamp to scan for our-of-order files.
+	 *  Valid value for this is from 0 to Long.MAX_VALUE.
+	 *
+	 *  <p><b>NOTE: </b>: Files with (modTime > Long.MIN_VALUE + readConsistencyOffset) will NOT be read.
+	 */
+	private final long readConsistencyOffset;
+
+	/** The current modification time watermark. */
 	private volatile long globalModificationTime = Long.MIN_VALUE;
 
+	/** The maximum file modification time seen so far. */
+	private volatile long maxProcessedTime = Long.MIN_VALUE;

Review comment:
       There has been a recent change which added the initialization for globalModificationTime in the constructor, thus I will be making the change for that variable as well.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #6613: [FLINK-9940] [API/DataStream][File source] out-of-order files were missed in continuous monitoring

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #6613:
URL: https://github.com/apache/flink/pull/6613#issuecomment-526964862


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "85434111363874caddb161d5f49f22711397ba55",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/125372744",
       "triggerID" : "85434111363874caddb161d5f49f22711397ba55",
       "triggerType" : "PUSH"
     }, {
       "hash" : "85434111363874caddb161d5f49f22711397ba55",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/125372744",
       "triggerID" : "526964862",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "b5aad1232ad0cc45e45299dc8480ed57f9ddfa55",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6259",
       "triggerID" : "b5aad1232ad0cc45e45299dc8480ed57f9ddfa55",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4d48ab513b2b38147013f2b4b824493188fb9269",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6260",
       "triggerID" : "4d48ab513b2b38147013f2b4b824493188fb9269",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4d48ab513b2b38147013f2b4b824493188fb9269 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6260) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u closed pull request #6613: [FLINK-9940] [API/DataStream][File source] out-of-order files were missed in continuous monitoring

Posted by GitBox <gi...@apache.org>.
kl0u closed pull request #6613:
URL: https://github.com/apache/flink/pull/6613


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] guoweiM commented on pull request #6613: [FLINK-9940] [API/DataStream][File source] out-of-order files were missed in continuous monitoring

Posted by GitBox <gi...@apache.org>.
guoweiM commented on pull request #6613:
URL: https://github.com/apache/flink/pull/6613#issuecomment-705278656


   Do not worry. Take your time.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u commented on pull request #6613: [FLINK-9940] [API/DataStream][File source] out-of-order files were missed in continuous monitoring

Posted by GitBox <gi...@apache.org>.
kl0u commented on pull request #6613:
URL: https://github.com/apache/flink/pull/6613#issuecomment-677494490


   Sorry @lvhuyen for closing this, I am reopening it because I just noticed there is discussion on the related JIRA.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lvhuyen commented on a change in pull request #6613: [FLINK-9940] [API/DataStream][File source] out-of-order files were missed in continuous monitoring

Posted by GitBox <gi...@apache.org>.
lvhuyen commented on a change in pull request #6613:
URL: https://github.com/apache/flink/pull/6613#discussion_r516324868



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -172,6 +236,26 @@ public void initializeState(FunctionInitializationContext context) throws Except
 					LOG.debug("{} retrieved a global mod time of {}.",
 						getClass().getSimpleName(), globalModificationTime);
 				}
+				if (retrievedStates2.size() == 1 && processedFiles.size() != 0) {

Review comment:
       This logic was copied from the existing code (13 lines earlier in the same file) where the check was done 
   `if (retrievedStates.size() == 1 && globalModificationTime != Long.MIN_VALUE) {`
   I thought there might be some edge cases which might lead to the situation where _RestoreStates_ is called when the states have already been initialised.
   Might need @kl0u 's help.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -93,20 +102,45 @@
 	/** Which new data to process (see {@link FileProcessingMode}. */
 	private final FileProcessingMode watchType;
 
-	/** The maximum file modification time seen so far. */
+	/** The offset interval back from the latest file modification timestamp to scan for our-of-order files.
+	 *  Valid value for this is from 0 to Long.MAX_VALUE.
+	 *
+	 *  <p><b>NOTE: </b>: Files with (modTime > Long.MIN_VALUE + readConsistencyOffset) will NOT be read.
+	 */
+	private final long readConsistencyOffset;
+
+	/** The current modification time watermark. */
 	private volatile long globalModificationTime = Long.MIN_VALUE;

Review comment:
       As I explained in the corresponding JIRA ticket https://issues.apache.org/jira/browse/FLINK-9940, yes, this is needed. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] guoweiM commented on a change in pull request #6613: [FLINK-9940] [API/DataStream][File source] out-of-order files were missed in continuous monitoring

Posted by GitBox <gi...@apache.org>.
guoweiM commented on a change in pull request #6613:
URL: https://github.com/apache/flink/pull/6613#discussion_r460665955



##########
File path: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
##########
@@ -1054,11 +1095,22 @@ private static int getLineNo(String line) {
 	}
 
 	/**
-	 * Create continuous monitoring function with 1 reader-parallelism and interval: {@link #INTERVAL}.
+	 * Create continuous monitoring function with 1 reader-parallelism, interval {@link #INTERVAL}
+	 * and read_consistency_offset_interval {@link #READ_CONSISTENCY_OFFSET_INTERVAL}.
 	 */
 	private <OUT> ContinuousFileMonitoringFunction<OUT> createTestContinuousFileMonitoringFunction(FileInputFormat<OUT> format, FileProcessingMode fileProcessingMode) {
 		ContinuousFileMonitoringFunction<OUT> monitoringFunction =
-			new ContinuousFileMonitoringFunction<>(format, fileProcessingMode, 1, INTERVAL);
+			new ContinuousFileMonitoringFunction<>(format, fileProcessingMode, 1, INTERVAL, READ_CONSISTENCY_OFFSET_INTERVAL);
+		monitoringFunction.setRuntimeContext(Mockito.mock(RuntimeContext.class));

Review comment:
       It is best not to rely on mockito when writing tests.  You could follow the coding guide https://flink.apache.org/contributing/code-style-and-quality-common.html

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -93,20 +102,45 @@
 	/** Which new data to process (see {@link FileProcessingMode}. */
 	private final FileProcessingMode watchType;
 
-	/** The maximum file modification time seen so far. */
+	/** The offset interval back from the latest file modification timestamp to scan for our-of-order files.
+	 *  Valid value for this is from 0 to Long.MAX_VALUE.
+	 *
+	 *  <p><b>NOTE: </b>: Files with (modTime > Long.MIN_VALUE + readConsistencyOffset) will NOT be read.

Review comment:
       Do you mean modTime  < maxProcessedTime - readConsistencyOffset?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -93,20 +102,45 @@
 	/** Which new data to process (see {@link FileProcessingMode}. */
 	private final FileProcessingMode watchType;
 
-	/** The maximum file modification time seen so far. */
+	/** The offset interval back from the latest file modification timestamp to scan for our-of-order files.

Review comment:
       out-of-order

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -93,20 +102,45 @@
 	/** Which new data to process (see {@link FileProcessingMode}. */
 	private final FileProcessingMode watchType;
 
-	/** The maximum file modification time seen so far. */
+	/** The offset interval back from the latest file modification timestamp to scan for our-of-order files.
+	 *  Valid value for this is from 0 to Long.MAX_VALUE.
+	 *
+	 *  <p><b>NOTE: </b>: Files with (modTime > Long.MIN_VALUE + readConsistencyOffset) will NOT be read.
+	 */
+	private final long readConsistencyOffset;
+
+	/** The current modification time watermark. */
 	private volatile long globalModificationTime = Long.MIN_VALUE;
 
+	/** The maximum file modification time seen so far. */
+	private volatile long maxProcessedTime = Long.MIN_VALUE;

Review comment:
       Construction already initializes the value.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -172,6 +236,26 @@ public void initializeState(FunctionInitializationContext context) throws Except
 					LOG.debug("{} retrieved a global mod time of {}.",
 						getClass().getSimpleName(), globalModificationTime);
 				}
+				if (retrievedStates2.size() == 1 && processedFiles.size() != 0) {

Review comment:
       Maybe I miss something but I want to know why we need this check? 
   Currently, I find that the `processedFiles.size()` always is 0 when `initializeState` is called.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -248,7 +332,19 @@ private void monitorDirAndForwardSplits(FileSystem fs,
 				context.collect(split);
 			}
 			// update the global modification time
-			globalModificationTime = Math.max(globalModificationTime, modificationTime);
+			maxProcessedTime = Math.max(maxProcessedTime, modificationTime);

Review comment:
       I think this line might not be consistent with the up comments

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -248,7 +332,19 @@ private void monitorDirAndForwardSplits(FileSystem fs,
 				context.collect(split);
 			}
 			// update the global modification time
-			globalModificationTime = Math.max(globalModificationTime, modificationTime);
+			maxProcessedTime = Math.max(maxProcessedTime, modificationTime);
+		}
+		// Populate processed files.
+		// This check is to ensure that globalModificationTime will not go backward
+		// even if readConsistencyOffset is changed to a large value after a restore from checkpoint,
+		// so  files would be processed twice
+		globalModificationTime = Math.max(maxProcessedTime - readConsistencyOffset, globalModificationTime);

Review comment:
       Maybe we could remove this logic if we do not use the globalModificationTime.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -93,20 +102,45 @@
 	/** Which new data to process (see {@link FileProcessingMode}. */
 	private final FileProcessingMode watchType;
 
-	/** The maximum file modification time seen so far. */
+	/** The offset interval back from the latest file modification timestamp to scan for our-of-order files.
+	 *  Valid value for this is from 0 to Long.MAX_VALUE.
+	 *
+	 *  <p><b>NOTE: </b>: Files with (modTime > Long.MIN_VALUE + readConsistencyOffset) will NOT be read.
+	 */
+	private final long readConsistencyOffset;
+
+	/** The current modification time watermark. */
 	private volatile long globalModificationTime = Long.MIN_VALUE;
 
+	/** The maximum file modification time seen so far. */
+	private volatile long maxProcessedTime = Long.MIN_VALUE;
+
+	/** The list of processed files having modification time within the period from globalModificationTime
+	 *  to maxProcessedTime in the form of a Map&lt;filePath, lastModificationTime&gt;. */
+	private volatile Map<String, Long> processedFiles;
+
 	private transient Object checkpointLock;
 
 	private volatile boolean isRunning = true;
 
 	private transient ListState<Long> checkpointedState;

Review comment:
       Do we still need this state in the current implementation?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -93,20 +102,45 @@
 	/** Which new data to process (see {@link FileProcessingMode}. */
 	private final FileProcessingMode watchType;
 
-	/** The maximum file modification time seen so far. */
+	/** The offset interval back from the latest file modification timestamp to scan for our-of-order files.
+	 *  Valid value for this is from 0 to Long.MAX_VALUE.
+	 *
+	 *  <p><b>NOTE: </b>: Files with (modTime > Long.MIN_VALUE + readConsistencyOffset) will NOT be read.
+	 */
+	private final long readConsistencyOffset;
+
+	/** The current modification time watermark. */
 	private volatile long globalModificationTime = Long.MIN_VALUE;

Review comment:
        Is it possible that we could not depend on this variable? And I think this would reduce the complexity of restoring logical.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -376,9 +479,12 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception {
 
 		this.checkpointedState.clear();
 		this.checkpointedState.add(this.globalModificationTime);

Review comment:
       see above




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] guoweiM commented on pull request #6613: [FLINK-9940] [API/DataStream][File source] out-of-order files were missed in continuous monitoring

Posted by GitBox <gi...@apache.org>.
guoweiM commented on pull request #6613:
URL: https://github.com/apache/flink/pull/6613#issuecomment-698073535


   Sorry for the late reply. Thanks @lvhuyen very much for resolving the comments.
   
   What I concern is that why we need `globalModificationTime` and `maxProcessedTime` at the same time. Actually we could always compute the `globalModificationTime` from the `maxProcessedTime`. IMHO reducing the mutable states  would make the code better maintenance and understanding.
   
   Of course maybe I miss some scenario which needs the two states. So would you like to enlighten me why we still need these two variables at the same time.
   
   Thanks again for your patience.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #6613: [FLINK-9940] [API/DataStream][File source] out-of-order files were missed in continuous monitoring

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #6613:
URL: https://github.com/apache/flink/pull/6613#issuecomment-526964862


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "85434111363874caddb161d5f49f22711397ba55",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/125372744",
       "triggerID" : "85434111363874caddb161d5f49f22711397ba55",
       "triggerType" : "PUSH"
     }, {
       "hash" : "85434111363874caddb161d5f49f22711397ba55",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/125372744",
       "triggerID" : "526964862",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "b5aad1232ad0cc45e45299dc8480ed57f9ddfa55",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6259",
       "triggerID" : "b5aad1232ad0cc45e45299dc8480ed57f9ddfa55",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4d48ab513b2b38147013f2b4b824493188fb9269",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6260",
       "triggerID" : "4d48ab513b2b38147013f2b4b824493188fb9269",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b5aad1232ad0cc45e45299dc8480ed57f9ddfa55 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6259) 
   * 4d48ab513b2b38147013f2b4b824493188fb9269 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6260) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] guoweiM commented on pull request #6613: [FLINK-9940] [API/DataStream][File source] out-of-order files were missed in continuous monitoring

Posted by GitBox <gi...@apache.org>.
guoweiM commented on pull request #6613:
URL: https://github.com/apache/flink/pull/6613#issuecomment-773980912


   As said on pr, this problem does not exist on the new FileSource. So I close the pr. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lvhuyen commented on pull request #6613: [FLINK-9940] [API/DataStream][File source] out-of-order files were missed in continuous monitoring

Posted by GitBox <gi...@apache.org>.
lvhuyen commented on pull request #6613:
URL: https://github.com/apache/flink/pull/6613#issuecomment-677969037


   Thank you Kostas.I have been waiting for Guowei's reply on the Jira ticket as it might render my PR redundant.
   
   BTW, may I ask how you monitor the activeness of a PR? My PR was created almost 2 years ago, but most of the time it stayed in "waiting for review" state.
   Thanks.Averell Huyen Levan 
   
       On Thursday, August 20, 2020, 07:43:22 PM GMT+10, Kostas Kloudas <no...@github.com> wrote:  
    
    
   
   
   Sorry @lvhuyen for closing this, I am reopening it because I just noticed there is discussion on the related JIRA.
   
   —
   You are receiving this because you were mentioned.
   Reply to this email directly, view it on GitHub, or unsubscribe.
     


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u commented on pull request #6613: [FLINK-9940] [API/DataStream][File source] out-of-order files were missed in continuous monitoring

Posted by GitBox <gi...@apache.org>.
kl0u commented on pull request #6613:
URL: https://github.com/apache/flink/pull/6613#issuecomment-677491493


   I'm closing this as "Abandoned", since there is no more activity.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org