You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/07/27 08:44:29 UTC

[GitHub] [flink] guoweiM commented on a change in pull request #6613: [FLINK-9940] [API/DataStream][File source] out-of-order files were missed in continuous monitoring

guoweiM commented on a change in pull request #6613:
URL: https://github.com/apache/flink/pull/6613#discussion_r460665955



##########
File path: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
##########
@@ -1054,11 +1095,22 @@ private static int getLineNo(String line) {
 	}
 
 	/**
-	 * Create continuous monitoring function with 1 reader-parallelism and interval: {@link #INTERVAL}.
+	 * Create continuous monitoring function with 1 reader-parallelism, interval {@link #INTERVAL}
+	 * and read_consistency_offset_interval {@link #READ_CONSISTENCY_OFFSET_INTERVAL}.
 	 */
 	private <OUT> ContinuousFileMonitoringFunction<OUT> createTestContinuousFileMonitoringFunction(FileInputFormat<OUT> format, FileProcessingMode fileProcessingMode) {
 		ContinuousFileMonitoringFunction<OUT> monitoringFunction =
-			new ContinuousFileMonitoringFunction<>(format, fileProcessingMode, 1, INTERVAL);
+			new ContinuousFileMonitoringFunction<>(format, fileProcessingMode, 1, INTERVAL, READ_CONSISTENCY_OFFSET_INTERVAL);
+		monitoringFunction.setRuntimeContext(Mockito.mock(RuntimeContext.class));

Review comment:
       It is best not to rely on mockito when writing tests.  You could follow the coding guide https://flink.apache.org/contributing/code-style-and-quality-common.html

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -93,20 +102,45 @@
 	/** Which new data to process (see {@link FileProcessingMode}. */
 	private final FileProcessingMode watchType;
 
-	/** The maximum file modification time seen so far. */
+	/** The offset interval back from the latest file modification timestamp to scan for our-of-order files.
+	 *  Valid value for this is from 0 to Long.MAX_VALUE.
+	 *
+	 *  <p><b>NOTE: </b>: Files with (modTime > Long.MIN_VALUE + readConsistencyOffset) will NOT be read.

Review comment:
       Do you mean modTime  < maxProcessedTime - readConsistencyOffset?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -93,20 +102,45 @@
 	/** Which new data to process (see {@link FileProcessingMode}. */
 	private final FileProcessingMode watchType;
 
-	/** The maximum file modification time seen so far. */
+	/** The offset interval back from the latest file modification timestamp to scan for our-of-order files.

Review comment:
       out-of-order

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -93,20 +102,45 @@
 	/** Which new data to process (see {@link FileProcessingMode}. */
 	private final FileProcessingMode watchType;
 
-	/** The maximum file modification time seen so far. */
+	/** The offset interval back from the latest file modification timestamp to scan for our-of-order files.
+	 *  Valid value for this is from 0 to Long.MAX_VALUE.
+	 *
+	 *  <p><b>NOTE: </b>: Files with (modTime > Long.MIN_VALUE + readConsistencyOffset) will NOT be read.
+	 */
+	private final long readConsistencyOffset;
+
+	/** The current modification time watermark. */
 	private volatile long globalModificationTime = Long.MIN_VALUE;
 
+	/** The maximum file modification time seen so far. */
+	private volatile long maxProcessedTime = Long.MIN_VALUE;

Review comment:
       Construction already initializes the value.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -172,6 +236,26 @@ public void initializeState(FunctionInitializationContext context) throws Except
 					LOG.debug("{} retrieved a global mod time of {}.",
 						getClass().getSimpleName(), globalModificationTime);
 				}
+				if (retrievedStates2.size() == 1 && processedFiles.size() != 0) {

Review comment:
       Maybe I miss something but I want to know why we need this check? 
   Currently, I find that the `processedFiles.size()` always is 0 when `initializeState` is called.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -248,7 +332,19 @@ private void monitorDirAndForwardSplits(FileSystem fs,
 				context.collect(split);
 			}
 			// update the global modification time
-			globalModificationTime = Math.max(globalModificationTime, modificationTime);
+			maxProcessedTime = Math.max(maxProcessedTime, modificationTime);

Review comment:
       I think this line might not be consistent with the up comments

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -248,7 +332,19 @@ private void monitorDirAndForwardSplits(FileSystem fs,
 				context.collect(split);
 			}
 			// update the global modification time
-			globalModificationTime = Math.max(globalModificationTime, modificationTime);
+			maxProcessedTime = Math.max(maxProcessedTime, modificationTime);
+		}
+		// Populate processed files.
+		// This check is to ensure that globalModificationTime will not go backward
+		// even if readConsistencyOffset is changed to a large value after a restore from checkpoint,
+		// so  files would be processed twice
+		globalModificationTime = Math.max(maxProcessedTime - readConsistencyOffset, globalModificationTime);

Review comment:
       Maybe we could remove this logic if we do not use the globalModificationTime.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -93,20 +102,45 @@
 	/** Which new data to process (see {@link FileProcessingMode}. */
 	private final FileProcessingMode watchType;
 
-	/** The maximum file modification time seen so far. */
+	/** The offset interval back from the latest file modification timestamp to scan for our-of-order files.
+	 *  Valid value for this is from 0 to Long.MAX_VALUE.
+	 *
+	 *  <p><b>NOTE: </b>: Files with (modTime > Long.MIN_VALUE + readConsistencyOffset) will NOT be read.
+	 */
+	private final long readConsistencyOffset;
+
+	/** The current modification time watermark. */
 	private volatile long globalModificationTime = Long.MIN_VALUE;
 
+	/** The maximum file modification time seen so far. */
+	private volatile long maxProcessedTime = Long.MIN_VALUE;
+
+	/** The list of processed files having modification time within the period from globalModificationTime
+	 *  to maxProcessedTime in the form of a Map&lt;filePath, lastModificationTime&gt;. */
+	private volatile Map<String, Long> processedFiles;
+
 	private transient Object checkpointLock;
 
 	private volatile boolean isRunning = true;
 
 	private transient ListState<Long> checkpointedState;

Review comment:
       Do we still need this state in the current implementation?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -93,20 +102,45 @@
 	/** Which new data to process (see {@link FileProcessingMode}. */
 	private final FileProcessingMode watchType;
 
-	/** The maximum file modification time seen so far. */
+	/** The offset interval back from the latest file modification timestamp to scan for our-of-order files.
+	 *  Valid value for this is from 0 to Long.MAX_VALUE.
+	 *
+	 *  <p><b>NOTE: </b>: Files with (modTime > Long.MIN_VALUE + readConsistencyOffset) will NOT be read.
+	 */
+	private final long readConsistencyOffset;
+
+	/** The current modification time watermark. */
 	private volatile long globalModificationTime = Long.MIN_VALUE;

Review comment:
        Is it possible that we could not depend on this variable? And I think this would reduce the complexity of restoring logical.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -376,9 +479,12 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception {
 
 		this.checkpointedState.clear();
 		this.checkpointedState.add(this.globalModificationTime);

Review comment:
       see above




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org