You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by bowenli86 <gi...@git.apache.org> on 2018/06/07 00:17:35 UTC

[GitHub] flink pull request #6130: [FLINK-9545] Support read a file multiple times in...

GitHub user bowenli86 opened a pull request:

    https://github.com/apache/flink/pull/6130

    [FLINK-9545] Support read a file multiple times in Flink DataStream

    ## What is the purpose of the change
    
    we need `StreamExecutionEnvironment.readFile/readTextFile` to read each file for N times, but currently it only supports reading file once.
    
    add support for the feature.
    
    ## Brief change log
    
    - add a new processing mode as PROCESSING_N_TIMES
    - add additional parameter numTimes for StreamExecutionEnvironment.readFile/readTextFile
    
    ## Verifying this change
    
    This change is already covered by existing tests, such as *(please describe tests)*.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes)
      - If yes, how is the feature documented? (docs / JavaDocs)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/bowenli86/flink FLINK-9545

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6130.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6130
    
----
commit d51fd25ca0ff8e38aaf84d2076c9c979cd136c9d
Author: Bowen Li <bo...@...>
Date:   2018-06-07T00:12:59Z

    [FLINK-9545] Support read a file multiple times in Flink DataStream

----


---

[GitHub] flink issue #6130: [FLINK-9545] Support read a file multiple times in Flink ...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/6130
  
    Hi @bowenli86, 
    Me, @zentol and @aljoscha both seem to have doubts about the utility of the feature.
    
    So given this, and to have a clean JIRA and list of PRs we have to work on, I would suggest 
    to close the PR and the related issue.



---

[GitHub] flink pull request #6130: [FLINK-9545] Support read a file multiple times in...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6130#discussion_r193637758
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---
    @@ -1093,6 +1178,59 @@ public TimeCharacteristic getStreamTimeCharacteristic() {
     		return readFile(inputFormat, filePath, watchType, interval, typeInformation);
     	}
     
    +	/**
    +	 * Reads the contents of the user-specified {@code filePath} based on the given {@link FileInputFormat}. Depending
    +	 * on the provided {@link FileProcessingMode}, the source may periodically monitor (every {@code interval} ms) the path
    +	 * for new data ({@link FileProcessingMode#PROCESS_CONTINUOUSLY}), process once or {@code numTimes} times the data currently in
    +	 * the path and exit ({@link FileProcessingMode#PROCESS_ONCE} or {@link FileProcessingMode#PROCESS_N_TIMES}).
    +	 * In addition, if the path contains files not to be processed, the user can specify a custom {@link FilePathFilter}.
    +	 * As a default implementation you can use {@link FilePathFilter#createDefaultFilter()}.
    +	 *
    +	 * <p>Since all data streams need specific information about their types, this method needs to determine the
    +	 * type of the data produced by the input format. It will attempt to determine the data type by reflection,
    +	 * unless the input format implements the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
    +	 * In the latter case, this method will invoke the
    +	 * {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()} method to determine data
    +	 * type produced by the input format.
    +	 *
    +	 * <p><b>NOTES ON CHECKPOINTING: </b> If the {@code watchType} is set to {@link FileProcessingMode#PROCESS_ONCE} or {@link FileProcessingMode#PROCESS_N_TIMES} ,
    +	 * the source monitors the path <b>once</b>, creates the {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits}
    +	 * to be processed, forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data,
    +	 * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint barriers
    +	 * are going to be forwarded after the source exits, thus having no checkpoints after that point.
    +	 *
    +	 * @param inputFormat
    +	 * 		The input format used to create the data stream
    +	 * @param filePath
    +	 * 		The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
    +	 * @param watchType
    +	 * 		The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit
    +	 * @param interval
    +	 * 		In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans
    +	 * @param <OUT>
    +	 * 		The type of the returned data stream
    +	 * @param numTimes
    +	 * 		The number of times to read the file
    +	 * @return The data stream that represents the data read from the given file
    +	 */
    +	@PublicEvolving
    +	public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat,
    +												String filePath,
    +												FileProcessingMode watchType,
    +												long interval,
    +												int numTimes) {
    +
    +		TypeInformation<OUT> typeInformation;
    +		try {
    +			typeInformation = TypeExtractor.getInputFormatTypes(inputFormat);
    +		} catch (Exception e) {
    +			throw new InvalidProgramException("The type returned by the input format could not be " +
    +				"automatically determined. Please specify the TypeInformation of the produced type " +
    +				"explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead.");
    --- End diff --
    
    Maybe it better to not swallow the Exception here.


---

[GitHub] flink pull request #6130: [FLINK-9545] Support read a file multiple times in...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6130#discussion_r193638907
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---
    @@ -1355,26 +1541,39 @@ public TimeCharacteristic getStreamTimeCharacteristic() {
     														TypeInformation<OUT> typeInfo,
     														String sourceName,
     														FileProcessingMode monitoringMode,
    -														long interval) {
    +														long interval,
    +														int numTimes) {
     
     		Preconditions.checkNotNull(inputFormat, "Unspecified file input format.");
     		Preconditions.checkNotNull(typeInfo, "Unspecified output type information.");
     		Preconditions.checkNotNull(sourceName, "Unspecified name for the source.");
     		Preconditions.checkNotNull(monitoringMode, "Unspecified monitoring mode.");
     
    -		Preconditions.checkArgument(monitoringMode.equals(FileProcessingMode.PROCESS_ONCE) ||
    -				interval >= ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
    -			"The path monitoring interval cannot be less than " +
    -					ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL + " ms.");
    +		switch (monitoringMode) {
    +			case PROCESS_ONCE:
    +				Preconditions.checkArgument(numTimes == 1,
    +					"The specified number of times to read a file should be 1, but is " + numTimes);
    +				break;
    +			case PROCESS_N_TIMES:
    +				Preconditions.checkArgument(numTimes >= 1,
    +					"The specified number of times to read a file should be no less than 1, but is " + numTimes);
    +				break;
    +			case PROCESS_CONTINUOUSLY:
    +				Preconditions.checkArgument(interval >= ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
    +					String.format("The path monitoring interval cannot be less than %d ms in %s mode.",
    +						ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL, FileProcessingMode.PROCESS_CONTINUOUSLY));
    +				break;
    +			default:
    --- End diff --
    
    How about raising an `IllegalArgumentException` here?


---

[GitHub] flink issue #6130: [FLINK-9545] Support read a file multiple times in Flink ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/6130
  
    I posted on the Jira issue: https://issues.apache.org/jira/browse/FLINK-9545?focusedCommentId=16504451&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16504451: What's the motivation for this feature?


---

[GitHub] flink pull request #6130: [FLINK-9545] Support read a file multiple times in...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6130#discussion_r193638176
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---
    @@ -1161,7 +1301,53 @@ public TimeCharacteristic getStreamTimeCharacteristic() {
     		Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be empty.");
     
     		inputFormat.setFilePath(filePath);
    -		return createFileInput(inputFormat, typeInformation, "Custom File Source", watchType, interval);
    +		return createFileInput(inputFormat, typeInformation, "Custom File Source", watchType, interval, 1);
    +	}
    +
    +	/**
    +	 * Reads the contents of the user-specified {@code filePath} based on the given {@link FileInputFormat}.
    +	 * Depending on the provided {@link FileProcessingMode}, the source may periodically monitor (every {@code interval} ms)
    +	 * the path for new data ({@link FileProcessingMode#PROCESS_CONTINUOUSLY}), process once or {@code numTimes} times the data currently in
    +	 * the path and exit ({@link FileProcessingMode#PROCESS_ONCE} or {@link FileProcessingMode#PROCESS_N_TIMES}).
    +	 * In addition, if the path contains files not to be processed, the user can specify a custom {@link FilePathFilter}.
    +	 * As a default implementation you can use {@link FilePathFilter#createDefaultFilter()}.
    +	 *
    +	 * <p><b>NOTES ON CHECKPOINTING: </b> If the {@code watchType} is set to {@link FileProcessingMode#PROCESS_ONCE} or {@link FileProcessingMode#PROCESS_N_TIMES},
    +	 * the source monitors the path <b>once</b>, creates the {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits}
    +	 * to be processed, forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data,
    +	 * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint barriers
    +	 * are going to be forwarded after the source exits, thus having no checkpoints after that point.
    +	 *
    +	 * @param inputFormat
    +	 * 		The input format used to create the data stream
    +	 * @param filePath
    +	 * 		The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
    +	 * @param watchType
    +	 * 		The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit
    +	 * @param typeInformation
    +	 * 		Information on the type of the elements in the output stream
    +	 * @param interval
    +	 * 		In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans
    +	 * @param <OUT>
    +	 * 		The type of the returned data stream
    +	 * @param numTimes
    +	 * 		The number of times to read the file
    +	 * @return The data stream that represents the data read from the given file
    +	 */
    +	@PublicEvolving
    +	public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat,
    +												String filePath,
    +												FileProcessingMode watchType,
    +												long interval,
    +												TypeInformation<OUT> typeInformation,
    +												int numTimes) {
    --- End diff --
    
    Could numTimes be negative? If not, we could use the `@Nonnegative` annotation, or check it in this function.


---

[GitHub] flink issue #6130: [FLINK-9545] Support read a file multiple times in Flink ...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/6130
  
    Hi @bowenli86 ! 
    
    Why not having a `flatmap` after the `readFile` and for every incoming element you emit as many copies as you want?
    
    Personally, I am not so fond of adding methods to the public APIs for such specific usecases.


---

[GitHub] flink issue #6130: [FLINK-9545] Support read a file multiple times in Flink ...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on the issue:

    https://github.com/apache/flink/pull/6130
  
    @aljoscha 
    
    Motivation: We have the requirements to read a bunch files, each file to read multiple times, to feed our streams
    
    Specifically we need `StreamExecutionEnvironment.readFile/readTextFile` to be able to read a file for a specified `N` times, but currently it only supports reading file once.
    
    We've implemented this internally. Would be good to get it back to the community version. This jira is to add support for the feature. 


---

[GitHub] flink issue #6130: [FLINK-9545] Support read a file multiple times in Flink ...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on the issue:

    https://github.com/apache/flink/pull/6130
  
    Not really. It's not about having n copies of data. One use case is File-fed stream pipeline usually runs very fast with inadequate metrics, users need to run it end-to-end for a longer time to gather stable metrics and tune all components in the pipeline.
    
    I'll close it if community is not interested.


---

[GitHub] flink pull request #6130: [FLINK-9545] Support read a file multiple times in...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6130#discussion_r193637932
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---
    @@ -1161,7 +1301,53 @@ public TimeCharacteristic getStreamTimeCharacteristic() {
     		Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be empty.");
     
     		inputFormat.setFilePath(filePath);
    -		return createFileInput(inputFormat, typeInformation, "Custom File Source", watchType, interval);
    +		return createFileInput(inputFormat, typeInformation, "Custom File Source", watchType, interval, 1);
    +	}
    +
    +	/**
    +	 * Reads the contents of the user-specified {@code filePath} based on the given {@link FileInputFormat}.
    +	 * Depending on the provided {@link FileProcessingMode}, the source may periodically monitor (every {@code interval} ms)
    +	 * the path for new data ({@link FileProcessingMode#PROCESS_CONTINUOUSLY}), process once or {@code numTimes} times the data currently in
    +	 * the path and exit ({@link FileProcessingMode#PROCESS_ONCE} or {@link FileProcessingMode#PROCESS_N_TIMES}).
    +	 * In addition, if the path contains files not to be processed, the user can specify a custom {@link FilePathFilter}.
    +	 * As a default implementation you can use {@link FilePathFilter#createDefaultFilter()}.
    +	 *
    +	 * <p><b>NOTES ON CHECKPOINTING: </b> If the {@code watchType} is set to {@link FileProcessingMode#PROCESS_ONCE} or {@link FileProcessingMode#PROCESS_N_TIMES},
    +	 * the source monitors the path <b>once</b>, creates the {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits}
    +	 * to be processed, forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data,
    +	 * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint barriers
    +	 * are going to be forwarded after the source exits, thus having no checkpoints after that point.
    +	 *
    +	 * @param inputFormat
    +	 * 		The input format used to create the data stream
    +	 * @param filePath
    +	 * 		The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
    +	 * @param watchType
    +	 * 		The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit
    +	 * @param typeInformation
    +	 * 		Information on the type of the elements in the output stream
    +	 * @param interval
    +	 * 		In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans
    +	 * @param <OUT>
    +	 * 		The type of the returned data stream
    +	 * @param numTimes
    +	 * 		The number of times to read the file
    +	 * @return The data stream that represents the data read from the given file
    +	 */
    +	@PublicEvolving
    +	public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat,
    +												String filePath,
    +												FileProcessingMode watchType,
    +												long interval,
    +												TypeInformation<OUT> typeInformation,
    +												int numTimes) {
    +
    +		Preconditions.checkNotNull(inputFormat, "InputFormat must not be null.");
    +		Preconditions.checkNotNull(filePath, "The file path must not be null.");
    +		Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be empty.");
    --- End diff --
    
    Again, looks like should be `Preconditions.checkArgument()`.


---

[GitHub] flink pull request #6130: [FLINK-9545] Support read a file multiple times in...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6130#discussion_r193637243
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---
    @@ -970,6 +991,37 @@ public TimeCharacteristic getStreamTimeCharacteristic() {
     		return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo);
     	}
     
    +	/**
    +	 * Reads the given file line-by-line and creates a data stream that contains a string with the
    +	 * contents of each such line. The {@link java.nio.charset.Charset} with the given name will be
    +	 * used to read the files.
    +	 *
    +	 * <p><b>NOTES ON CHECKPOINTING: </b> The source monitors the path, creates the
    +	 * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed,
    +	 * forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data,
    +	 * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint
    +	 * barriers are going to be forwarded after the source exits, thus having no checkpoints after that point.
    +	 *
    +	 * @param filePath
    +	 * 		The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
    +	 * @param charsetName
    +	 * 		The name of the character set used to read the file
    +	 * @param numTimes
    +	 * 		The number of times to read the file
    +	 * @return The data stream that represents the data read from the given file as text lines
    +	 */
    +	public DataStreamSource<String> readTextFile(String filePath, String charsetName, int numTimes) {
    +		Preconditions.checkNotNull(filePath, "The file path must not be null.");
    +		Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be empty.");
    --- End diff --
    
    Should this be `Preconditions.checkArgument(filePath.isEmpty(), "The file path must not be empty.");`?
    BTW, maybe we could use the `Strings.isNullOrEmpty(filePath)` to merge this two checks into one.


---

[GitHub] flink pull request #6130: [FLINK-9545] Support read a file multiple times in...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 closed the pull request at:

    https://github.com/apache/flink/pull/6130


---

[GitHub] flink issue #6130: [FLINK-9545] Support read a file multiple times in Flink ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/6130
  
    You're effectively only explaining what this feature is, but not why it is actually needed. We have to gauge whether this feature is useful for other users as well before we decide to maintain it. 


---