You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/06/30 07:38:50 UTC
[flink] branch master updated: [FLINK-28047][api] Deprecate StreamExecutionEnvironment#readFile()/readTextFile() methods
This is an automated email from the ASF dual-hosted git repository.
martijnvisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3e73fb543eb [FLINK-28047][api] Deprecate StreamExecutionEnvironment#readFile()/readTextFile() methods
3e73fb543eb is described below
commit 3e73fb543eb7540a28838bcefe07fb72157a9731
Author: Alexander Fedulov <14...@users.noreply.github.com>
AuthorDate: Wed Jun 22 16:59:08 2022 +0200
[FLINK-28047][api] Deprecate StreamExecutionEnvironment#readFile()/readTextFile() methods
---
.../examples/iteration/IterateExample.java | 11 ++++-
.../examples/sideoutput/SideOutputExample.java | 27 +++++++----
.../environment/StreamExecutionEnvironment.java | 52 ++++++++++++++++++++++
3 files changed, 80 insertions(+), 10 deletions(-)
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index fa261cc6e3d..a38a090505c 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.examples.iteration;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -24,6 +25,8 @@ import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeStream;
@@ -83,7 +86,13 @@ public class IterateExample {
// create input stream of integer pairs
DataStream<Tuple2<Integer, Integer>> inputStream;
if (params.has("input")) {
- inputStream = env.readTextFile(params.get("input")).map(new FibonacciInputMap());
+ FileSource<String> fileSource =
+ FileSource.forRecordStreamFormat(
+ new TextLineInputFormat(), new Path(params.get("input")))
+ .build();
+ inputStream =
+ env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "Tuples Source")
+ .map(new FibonacciInputMap());
} else {
System.out.println("Executing Iterate example with default input data set.");
System.out.println("Use --input to specify file input.");
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java
index 8576fb4bef4..7439a33566c 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java
@@ -29,8 +29,11 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -70,23 +73,29 @@ public class SideOutputExample {
env.getConfig().setGlobalJobParameters(params);
// get input data
- DataStream<String> text;
+ DataStream<String> textWithTimestampAndWatermark;
if (params.has("input")) {
// read the text file from given input path
- text = env.readTextFile(params.get("input"));
+ FileSource<String> fileSource =
+ FileSource.forRecordStreamFormat(
+ new TextLineInputFormat(), new Path(params.get("input")))
+ .build();
+ textWithTimestampAndWatermark =
+ env.fromSource(
+ fileSource, IngestionTimeWatermarkStrategy.create(), "Words Source");
} else {
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
// get default test text data
- text = env.fromElements(WordCountData.WORDS);
+ DataStreamSource<String> text = env.fromElements(WordCountData.WORDS);
+ // We assign the WatermarkStrategy after creating the source because
+ // StreamExecutionEnvironment#fromElemenets() methods currently does not accept
+ // WatermarkStrategies. In a real-world job you should integrate the WatermarkStrategy
+ // in the source as shown above for the FileSource.
+ textWithTimestampAndWatermark =
+ text.assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create());
}
- // We assign the WatermarkStrategy after creating the source. In a real-world job you
- // should integrate the WatermarkStrategy in the source. The Kafka source allows this,
- // for example.
- SingleOutputStreamOperator<String> textWithTimestampAndWatermark =
- text.assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create());
-
SingleOutputStreamOperator<Tuple2<String, Integer>> tokenized =
textWithTimestampAndWatermark.process(new Tokenizer());
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 54dca343f61..035bb33eff6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1345,7 +1345,17 @@ public class StreamExecutionEnvironment {
* @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or
* "hdfs://host:port/file/path").
* @return The data stream that represents the data read from the given file as text lines
+ * @deprecated Use {@code
+ * FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}. An
+ * example of reading a file using a simple {@code TextLineInputFormat}:
+ * <pre>{@code
+ * FileSource<String> source =
+ * FileSource.forRecordStreamFormat(
+ * new TextLineInputFormat(), new Path("/foo/bar"))
+ * .build();
+ * }</pre>
*/
+ @Deprecated
public DataStreamSource<String> readTextFile(String filePath) {
return readTextFile(filePath, "UTF-8");
}
@@ -1365,7 +1375,17 @@ public class StreamExecutionEnvironment {
* "hdfs://host:port/file/path")
* @param charsetName The name of the character set used to read the file
* @return The data stream that represents the data read from the given file as text lines
+ * @deprecated Use {@code
+ * FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}. An
+ * example of reading a file using a simple {@code TextLineInputFormat}:
+ * <pre>{@code
+ * FileSource<String> source =
+ * FileSource.forRecordStreamFormat(
+ * new TextLineInputFormat("UTF-8"), new Path("/foo/bar"))
+ * .build();
+ * }</pre>
*/
+ @Deprecated
public DataStreamSource<String> readTextFile(String filePath, String charsetName) {
Preconditions.checkArgument(
!StringUtils.isNullOrWhitespaceOnly(filePath),
@@ -1402,7 +1422,17 @@ public class StreamExecutionEnvironment {
* @param inputFormat The input format used to create the data stream
* @param <OUT> The type of the returned data stream
* @return The data stream that represents the data read from the given file
+ * @deprecated Use {@code
+ * FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}. An
+ * example of reading a file using a simple {@code TextLineInputFormat}:
+ * <pre>{@code
+ * FileSource<String> source =
+ * FileSource.forRecordStreamFormat(
+ * new TextLineInputFormat(), new Path("/foo/bar"))
+ * .build();
+ * }</pre>
*/
+ @Deprecated
public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath) {
return readFile(inputFormat, filePath, FileProcessingMode.PROCESS_ONCE, -1);
}
@@ -1482,7 +1512,18 @@ public class StreamExecutionEnvironment {
* millis) between consecutive path scans
* @param <OUT> The type of the returned data stream
* @return The data stream that represents the data read from the given file
+ * @deprecated Use {@code
+ * FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}. An
+ * example of reading a file using a simple {@code TextLineInputFormat}:
+ * <pre>{@code
+ * FileSource<String> source =
+ * FileSource.forRecordStreamFormat(
+ * new TextLineInputFormat(), new Path("/foo/bar"))
+ * .monitorContinuously(Duration.of(10, SECONDS))
+ * .build();
+ * }</pre>
*/
+ @Deprecated
@PublicEvolving
public <OUT> DataStreamSource<OUT> readFile(
FileInputFormat<OUT> inputFormat,
@@ -1557,7 +1598,18 @@ public class StreamExecutionEnvironment {
* millis) between consecutive path scans
* @param <OUT> The type of the returned data stream
* @return The data stream that represents the data read from the given file
+ * @deprecated Use {@code
+ * FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}. An
+ * example of reading a file using a simple {@code TextLineInputFormat}:
+ * <pre>{@code
+ * FileSource<String> source =
+ * FileSource.forRecordStreamFormat(
+ * new TextLineInputFormat(), new Path("/foo/bar"))
+ * .monitorContinuously(Duration.of(10, SECONDS))
+ * .build();
+ * }</pre>
*/
+ @Deprecated
@PublicEvolving
public <OUT> DataStreamSource<OUT> readFile(
FileInputFormat<OUT> inputFormat,