You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/06/30 07:38:50 UTC

[flink] branch master updated: [FLINK-28047][api] Deprecate StreamExecutionEnvironment#readFile()/readTextFile() methods

This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e73fb543eb [FLINK-28047][api] Deprecate StreamExecutionEnvironment#readFile()/readTextFile() methods
3e73fb543eb is described below

commit 3e73fb543eb7540a28838bcefe07fb72157a9731
Author: Alexander Fedulov <14...@users.noreply.github.com>
AuthorDate: Wed Jun 22 16:59:08 2022 +0200

    [FLINK-28047][api] Deprecate StreamExecutionEnvironment#readFile()/readTextFile() methods
---
 .../examples/iteration/IterateExample.java         | 11 ++++-
 .../examples/sideoutput/SideOutputExample.java     | 27 +++++++----
 .../environment/StreamExecutionEnvironment.java    | 52 ++++++++++++++++++++++
 3 files changed, 80 insertions(+), 10 deletions(-)

diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index fa261cc6e3d..a38a090505c 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.examples.iteration;
 
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -24,6 +25,8 @@ import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeStream;
@@ -83,7 +86,13 @@ public class IterateExample {
         // create input stream of integer pairs
         DataStream<Tuple2<Integer, Integer>> inputStream;
         if (params.has("input")) {
-            inputStream = env.readTextFile(params.get("input")).map(new FibonacciInputMap());
+            FileSource<String> fileSource =
+                    FileSource.forRecordStreamFormat(
+                                    new TextLineInputFormat(), new Path(params.get("input")))
+                            .build();
+            inputStream =
+                    env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "Tuples Source")
+                            .map(new FibonacciInputMap());
         } else {
             System.out.println("Executing Iterate example with default input data set.");
             System.out.println("Use --input to specify file input.");
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java
index 8576fb4bef4..7439a33566c 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java
@@ -29,8 +29,11 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -70,23 +73,29 @@ public class SideOutputExample {
         env.getConfig().setGlobalJobParameters(params);
 
         // get input data
-        DataStream<String> text;
+        DataStream<String> textWithTimestampAndWatermark;
         if (params.has("input")) {
             // read the text file from given input path
-            text = env.readTextFile(params.get("input"));
+            FileSource<String> fileSource =
+                    FileSource.forRecordStreamFormat(
+                                    new TextLineInputFormat(), new Path(params.get("input")))
+                            .build();
+            textWithTimestampAndWatermark =
+                    env.fromSource(
+                            fileSource, IngestionTimeWatermarkStrategy.create(), "Words Source");
         } else {
             System.out.println("Executing WordCount example with default input data set.");
             System.out.println("Use --input to specify file input.");
             // get default test text data
-            text = env.fromElements(WordCountData.WORDS);
+            DataStreamSource<String> text = env.fromElements(WordCountData.WORDS);
+            // We assign the WatermarkStrategy after creating the source because
+            // StreamExecutionEnvironment#fromElemenets() methods currently does not accept
+            // WatermarkStrategies. In a real-world job you should integrate the WatermarkStrategy
+            // in the source as shown above for the FileSource.
+            textWithTimestampAndWatermark =
+                    text.assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create());
         }
 
-        // We assign the WatermarkStrategy after creating the source. In a real-world job you
-        // should integrate the WatermarkStrategy in the source. The Kafka source allows this,
-        // for example.
-        SingleOutputStreamOperator<String> textWithTimestampAndWatermark =
-                text.assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create());
-
         SingleOutputStreamOperator<Tuple2<String, Integer>> tokenized =
                 textWithTimestampAndWatermark.process(new Tokenizer());
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 54dca343f61..035bb33eff6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1345,7 +1345,17 @@ public class StreamExecutionEnvironment {
      * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or
      *     "hdfs://host:port/file/path").
      * @return The data stream that represents the data read from the given file as text lines
+     * @deprecated Use {@code
+     *     FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}. An
+     *     example of reading a file using a simple {@code TextLineInputFormat}:
+     *     <pre>{@code
+     * FileSource<String> source =
+     *        FileSource.forRecordStreamFormat(
+     *           new TextLineInputFormat(), new Path("/foo/bar"))
+     *        .build();
+     * }</pre>
      */
+    @Deprecated
     public DataStreamSource<String> readTextFile(String filePath) {
         return readTextFile(filePath, "UTF-8");
     }
@@ -1365,7 +1375,17 @@ public class StreamExecutionEnvironment {
      *     "hdfs://host:port/file/path")
      * @param charsetName The name of the character set used to read the file
      * @return The data stream that represents the data read from the given file as text lines
+     * @deprecated Use {@code
+     *     FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}. An
+     *     example of reading a file using a simple {@code TextLineInputFormat}:
+     *     <pre>{@code
+     * FileSource<String> source =
+     *        FileSource.forRecordStreamFormat(
+     *         new TextLineInputFormat("UTF-8"), new Path("/foo/bar"))
+     *        .build();
+     * }</pre>
      */
+    @Deprecated
     public DataStreamSource<String> readTextFile(String filePath, String charsetName) {
         Preconditions.checkArgument(
                 !StringUtils.isNullOrWhitespaceOnly(filePath),
@@ -1402,7 +1422,17 @@ public class StreamExecutionEnvironment {
      * @param inputFormat The input format used to create the data stream
      * @param <OUT> The type of the returned data stream
      * @return The data stream that represents the data read from the given file
+     * @deprecated Use {@code
+     *     FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}. An
+     *     example of reading a file using a simple {@code TextLineInputFormat}:
+     *     <pre>{@code
+     * FileSource<String> source =
+     *        FileSource.forRecordStreamFormat(
+     *           new TextLineInputFormat(), new Path("/foo/bar"))
+     *        .build();
+     * }</pre>
      */
+    @Deprecated
     public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath) {
         return readFile(inputFormat, filePath, FileProcessingMode.PROCESS_ONCE, -1);
     }
@@ -1482,7 +1512,18 @@ public class StreamExecutionEnvironment {
      *     millis) between consecutive path scans
      * @param <OUT> The type of the returned data stream
      * @return The data stream that represents the data read from the given file
+     * @deprecated Use {@code
+     *     FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}. An
+     *     example of reading a file using a simple {@code TextLineInputFormat}:
+     *     <pre>{@code
+     * FileSource<String> source =
+     *        FileSource.forRecordStreamFormat(
+     *           new TextLineInputFormat(), new Path("/foo/bar"))
+     *        .monitorContinuously(Duration.of(10, SECONDS))
+     *        .build();
+     * }</pre>
      */
+    @Deprecated
     @PublicEvolving
     public <OUT> DataStreamSource<OUT> readFile(
             FileInputFormat<OUT> inputFormat,
@@ -1557,7 +1598,18 @@ public class StreamExecutionEnvironment {
      *     millis) between consecutive path scans
      * @param <OUT> The type of the returned data stream
      * @return The data stream that represents the data read from the given file
+     * @deprecated Use {@code
+     *     FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}. An
+     *     example of reading a file using a simple {@code TextLineInputFormat}:
+     *     <pre>{@code
+     * FileSource<String> source =
+     *        FileSource.forRecordStreamFormat(
+     *           new TextLineInputFormat(), new Path("/foo/bar"))
+     *        .monitorContinuously(Duration.of(10, SECONDS))
+     *        .build();
+     * }</pre>
      */
+    @Deprecated
     @PublicEvolving
     public <OUT> DataStreamSource<OUT> readFile(
             FileInputFormat<OUT> inputFormat,