You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "ChangjiGuo (Jira)" <ji...@apache.org> on 2021/04/28 03:32:00 UTC

[jira] [Created] (FLINK-22497) When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed

ChangjiGuo created FLINK-22497:
----------------------------------

             Summary: When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed
                 Key: FLINK-22497
                 URL: https://issues.apache.org/jira/browse/FLINK-22497
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / FileSystem
    Affects Versions: 1.11.2
         Environment: hadoop-2.8.4
            Reporter: ChangjiGuo


I had a doubt when testing StreamingFileSink:

The default 60s rolling interval in DefaultRollingPolicy is detected by procTimeService. If the rolling interval is not met this time, it will be delayed to the next timer trigger point (after 60s), so this is not real-time. For example, if the checkpoint period is set to 60s, the file should be converted to finished at the second checkpoint, but it will be delayed to the third checkpoint.

You can refer to the attached picture for detail.

If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of Bucket.write method, the file will be set to finished in the second checkpoint.
{code:java}
void write(IN element, long currentTime) throws IOException {
    if (inProgressPart == null || rollingPolicy.shouldRollOnEvent(inProgressPart, element)
		||rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) {
		if (LOG.isDebugEnabled()) {
			LOG.info("Subtask {} closing in-progress part file for bucket id={} due to element {}.",
					subtaskIndex, bucketId, element);
		}
		rollPartFile(currentTime);
	}
	inProgressPart.write(element, currentTime);
}
{code}
 
Is my understanding correct? 
Thanks! ^_^



--
This message was sent by Atlassian Jira
(v8.3.4#803005)