You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2023/12/05 09:38:15 UTC

(flink) branch master updated (bc6c2cec37c -> 18c03f2e6c5)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from bc6c2cec37c [FLINK-33663] Serialize CallExpressions into SQL (#23811)
     new e44efbff807 [FLINK-28229][connectors] Introduce FLIP-27 alternative to StreamExecutionEnvironment#fromCollection()
     new 18c03f2e6c5 [FLINK-28229][connectors] Deprecate StreamExecutionEnvironment#fromCollection()

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../program/StreamContextEnvironmentTest.java      |  16 ++--
 .../client/testjar/ForbidConfigurationJob.java     |   2 +-
 .../datagen/source/DataGeneratorSource.java        |   3 +-
 .../flink/streaming/examples/join/WindowJoin.java  |  16 ++--
 .../examples/join/WindowJoinSampleData.java        |  75 +++++-----------
 .../examples/java/basics/StreamSQLExample.java     |   4 +-
 .../flink/formats/protobuf/ProtobufTestHelper.java |   2 +-
 .../apache/flink/state/api/SavepointWriter.java    |   4 +-
 .../flink/state/api/SavepointWriterITCase.java     |  12 +--
 .../api/SavepointWriterUidModificationITCase.java  |   4 +-
 .../state/api/SavepointWriterWindowITCase.java     |   8 +-
 .../flink/state/api/WritableSavepointITCase.java   |   8 +-
 .../flink/python/util/PythonConfigUtilTest.java    |   2 +-
 .../environment/StreamExecutionEnvironment.java    | 100 ++++++++++++++++-----
 .../StreamExecutionEnvironmentTest.java            |  57 +++++-------
 .../ExecutorDiscoveryAndJobClientTest.java         |   2 +-
 .../MultipleInputNodeCreationProcessorTest.java    |  14 ++-
 .../runtime/stream/sql/DataStreamJavaITCase.java   |   6 +-
 .../stream/table/TimeAttributesITCase.scala        |  15 +++-
 .../expressions/utils/ExpressionTestBase.scala     |   2 +-
 .../flink/table/planner/utils/TableTestBase.scala  |   4 +-
 .../table/planner/utils/testTableSourceSinks.scala |  10 +--
 .../multipleinput/MultipleInputTestBase.java       |   2 +-
 .../testframe/testsuites/SinkTestSuiteBase.java    |   2 +-
 .../test/accumulators/AccumulatorLiveITCase.java   |   2 +-
 .../streaming/runtime/BroadcastStateITCase.java    |   4 +-
 .../streaming/runtime/DataStreamPojoITCase.java    |   8 +-
 .../test/streaming/runtime/IterateITCase.java      |   4 +-
 .../streaming/runtime/LatencyMarkerITCase.java     |  10 +--
 .../test/streaming/runtime/PartitionerITCase.java  |   2 +-
 .../test/streaming/runtime/SideOutputITCase.java   |  46 +++++-----
 .../flink/test/streaming/runtime/SinkITCase.java   |   6 +-
 .../flink/test/streaming/runtime/SinkV2ITCase.java |   2 +-
 pom.xml                                            |   2 +
 tools/maven/checkstyle.xml                         |   2 +-
 35 files changed, 241 insertions(+), 217 deletions(-)


(flink) 01/02: [FLINK-28229][connectors] Introduce FLIP-27 alternative to StreamExecutionEnvironment#fromCollection()

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e44efbff8070dca3489550fdeadc5e1ce31e68c1
Author: Alexander Fedulov <14...@users.noreply.github.com>
AuthorDate: Fri Oct 20 00:58:03 2023 +0200

    [FLINK-28229][connectors] Introduce FLIP-27 alternative to StreamExecutionEnvironment#fromCollection()
---
 .../datagen/source/DataGeneratorSource.java        |  3 +-
 .../environment/StreamExecutionEnvironment.java    | 85 ++++++++++++++++------
 pom.xml                                            |  2 +
 3 files changed, 67 insertions(+), 23 deletions(-)

diff --git a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java
index a344eb635ad..3d2416c1e16 100644
--- a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java
+++ b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java
@@ -150,7 +150,8 @@ public class DataGeneratorSource<OUT>
         this.sourceReaderFactory = checkNotNull(sourceReaderFactory);
         this.generatorFunction = checkNotNull(generatorFunction);
         this.typeInfo = checkNotNull(typeInfo);
-        this.numberSource = new NumberSequenceSource(0, count - 1);
+        long to = count > 0 ? count - 1 : 0; // a noop source (0 elements) is used in Table tests
+        this.numberSource = new NumberSequenceSource(0, to);
         ClosureCleaner.clean(
                 generatorFunction, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
         ClosureCleaner.clean(
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 9069b3a0d3c..18dc49d3895 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1222,7 +1222,23 @@ public class StreamExecutionEnvironment implements AutoCloseable {
         return fromData(Arrays.asList(data), typeInfo);
     }
 
-    private <OUT> DataStreamSource<OUT> fromData(
+    /**
+     * Creates a new data stream that contains the given elements. The elements must all be of the
+     * same type, for example, all of the {@link String} or {@link Integer}.
+     *
+     * <p>The framework will try and determine the exact type from the elements. In case of generic
+     * elements, it may be necessary to manually supply the type information via {@link
+     * #fromData(org.apache.flink.api.common.typeinfo.TypeInformation, OUT...)}.
+     *
+     * <p>NOTE: This creates a non-parallel data stream source by default (parallelism of one).
+     * Adjustment of parallelism is supported via {@code setParallelism()} on the result.
+     *
+     * @param data The collection of elements to create the data stream from.
+     * @param typeInfo The type information of the elements.
+     * @param <OUT> The generic type of the returned data stream.
+     * @return The data stream representing the given collection
+     */
+    public <OUT> DataStreamSource<OUT> fromData(
             Collection<OUT> data, TypeInformation<OUT> typeInfo) {
         Preconditions.checkNotNull(data, "Collection must not be null");
 
@@ -1273,6 +1289,51 @@ public class StreamExecutionEnvironment implements AutoCloseable {
         return fromData(Arrays.asList(data), typeInfo);
     }
 
+    /**
+     * Creates a new data stream that contains the given elements.The type of the data stream is
+     * that of the elements in the collection.
+     *
+     * <p>The framework will try and determine the exact type from the collection elements. In case
+     * of generic elements, it may be necessary to manually supply the type information via {@link
+     * #fromData(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)}.
+     *
+     * <p>NOTE: This creates a non-parallel data stream source by default (parallelism of one).
+     * Adjustment of parallelism is supported via {@code setParallelism()} on the result.
+     *
+     * @param data The collection of elements to create the data stream from.
+     * @param <OUT> The generic type of the returned data stream.
+     * @return The data stream representing the given collection
+     */
+    public <OUT> DataStreamSource<OUT> fromData(Collection<OUT> data) {
+        TypeInformation<OUT> typeInfo = extractTypeInfoFromCollection(data);
+        return fromData(data, typeInfo);
+    }
+
+    private static <OUT> TypeInformation<OUT> extractTypeInfoFromCollection(Collection<OUT> data) {
+        Preconditions.checkNotNull(data, "Collection must not be null");
+        if (data.isEmpty()) {
+            throw new IllegalArgumentException("Collection must not be empty");
+        }
+
+        OUT first = data.iterator().next();
+        if (first == null) {
+            throw new IllegalArgumentException("Collection must not contain null elements");
+        }
+
+        TypeInformation<OUT> typeInfo;
+        try {
+            typeInfo = TypeExtractor.getForObject(first);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Could not create TypeInformation for type "
+                            + first.getClass()
+                            + "; please specify the TypeInformation manually via the version of the "
+                            + "method that explicitly accepts it as an argument.",
+                    e);
+        }
+        return typeInfo;
+    }
+
     /**
      * Creates a new data stream that contains a sequence of numbers. This is a parallel source, if
      * you manually set the parallelism to {@code 1} (using {@link
@@ -1415,27 +1476,7 @@ public class StreamExecutionEnvironment implements AutoCloseable {
      * @return The data stream representing the given collection
      */
     public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
-        Preconditions.checkNotNull(data, "Collection must not be null");
-        if (data.isEmpty()) {
-            throw new IllegalArgumentException("Collection must not be empty");
-        }
-
-        OUT first = data.iterator().next();
-        if (first == null) {
-            throw new IllegalArgumentException("Collection must not contain null elements");
-        }
-
-        TypeInformation<OUT> typeInfo;
-        try {
-            typeInfo = TypeExtractor.getForObject(first);
-        } catch (Exception e) {
-            throw new RuntimeException(
-                    "Could not create TypeInformation for type "
-                            + first.getClass()
-                            + "; please specify the TypeInformation manually via "
-                            + "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)",
-                    e);
-        }
+        TypeInformation<OUT> typeInfo = extractTypeInfoFromCollection(data);
         return fromCollection(data, typeInfo);
     }
 
diff --git a/pom.xml b/pom.xml
index c964ea7c9a6..18c8b4b3f49 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2299,6 +2299,8 @@ under the License.
 								<exclude>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.lang.Object[])</exclude>
 								<exclude>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(org.apache.flink.api.common.typeinfo.TypeInformation,java.lang.Object[])</exclude>
 								<exclude>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.lang.Class,java.lang.Object[])</exclude>
+								<exclude>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.util.Collection)</exclude>
+								<exclude>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.util.Collection,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude>
 								<!-- MARKER: end exclusions -->
 							</excludes>
 							<accessModifier>public</accessModifier>


(flink) 02/02: [FLINK-28229][connectors] Deprecate StreamExecutionEnvironment#fromCollection()

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 18c03f2e6c593a772f64cdb5c089e2911d3cbc89
Author: Alexander Fedulov <14...@users.noreply.github.com>
AuthorDate: Mon Nov 27 23:29:27 2023 +0100

    [FLINK-28229][connectors] Deprecate StreamExecutionEnvironment#fromCollection()
---
 .../program/StreamContextEnvironmentTest.java      | 16 ++---
 .../client/testjar/ForbidConfigurationJob.java     |  2 +-
 .../flink/streaming/examples/join/WindowJoin.java  | 16 +++--
 .../examples/join/WindowJoinSampleData.java        | 75 ++++++----------------
 .../examples/java/basics/StreamSQLExample.java     |  4 +-
 .../flink/formats/protobuf/ProtobufTestHelper.java |  2 +-
 .../apache/flink/state/api/SavepointWriter.java    |  4 +-
 .../flink/state/api/SavepointWriterITCase.java     | 12 ++--
 .../api/SavepointWriterUidModificationITCase.java  |  4 +-
 .../state/api/SavepointWriterWindowITCase.java     |  8 +--
 .../flink/state/api/WritableSavepointITCase.java   |  8 +--
 .../flink/python/util/PythonConfigUtilTest.java    |  2 +-
 .../environment/StreamExecutionEnvironment.java    | 15 +++++
 .../StreamExecutionEnvironmentTest.java            | 57 ++++++----------
 .../ExecutorDiscoveryAndJobClientTest.java         |  2 +-
 .../MultipleInputNodeCreationProcessorTest.java    | 14 +++-
 .../runtime/stream/sql/DataStreamJavaITCase.java   |  6 +-
 .../stream/table/TimeAttributesITCase.scala        | 15 +++--
 .../expressions/utils/ExpressionTestBase.scala     |  2 +-
 .../flink/table/planner/utils/TableTestBase.scala  |  4 +-
 .../table/planner/utils/testTableSourceSinks.scala | 10 +--
 .../multipleinput/MultipleInputTestBase.java       |  2 +-
 .../testframe/testsuites/SinkTestSuiteBase.java    |  2 +-
 .../test/accumulators/AccumulatorLiveITCase.java   |  2 +-
 .../streaming/runtime/BroadcastStateITCase.java    |  4 +-
 .../streaming/runtime/DataStreamPojoITCase.java    |  8 +--
 .../test/streaming/runtime/IterateITCase.java      |  4 +-
 .../streaming/runtime/LatencyMarkerITCase.java     | 10 +--
 .../test/streaming/runtime/PartitionerITCase.java  |  2 +-
 .../test/streaming/runtime/SideOutputITCase.java   | 46 ++++++-------
 .../flink/test/streaming/runtime/SinkITCase.java   |  6 +-
 .../flink/test/streaming/runtime/SinkV2ITCase.java |  2 +-
 tools/maven/checkstyle.xml                         |  2 +-
 33 files changed, 174 insertions(+), 194 deletions(-)

diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
index 1af115a402a..ac157b04f93 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java
@@ -78,7 +78,7 @@ class StreamContextEnvironmentTest {
         // Add/mutate values in the configuration
         environment.configure(programConfig);
 
-        environment.fromCollection(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
+        environment.fromData(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
         assertThatThrownBy(() -> executor.accept(environment))
                 .isInstanceOf(MutatedConfigurationException.class)
                 .hasMessageContainingAll(
@@ -106,7 +106,7 @@ class StreamContextEnvironmentTest {
         // Change the CheckpointConfig
         environment.getCheckpointConfig().setCheckpointStorage(disallowedPath);
 
-        environment.fromCollection(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
+        environment.fromData(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
         assertThatThrownBy(() -> executor.accept(environment))
                 .isInstanceOf(MutatedConfigurationException.class)
                 .hasMessageContainingAll(
@@ -114,7 +114,7 @@ class StreamContextEnvironmentTest {
 
         environment.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());
 
-        environment.fromCollection(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
+        environment.fromData(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
         assertThatThrownBy(() -> executor.accept(environment))
                 .isInstanceOf(MutatedConfigurationException.class)
                 .hasMessageContainingAll(
@@ -143,7 +143,7 @@ class StreamContextEnvironmentTest {
                         false,
                         Collections.emptyList());
 
-        environment.fromCollection(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
+        environment.fromData(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
         assertThatThrownBy(() -> executor.accept(environment))
                 .isInstanceOf(MutatedConfigurationException.class)
                 .hasMessageContainingAll(
@@ -169,7 +169,7 @@ class StreamContextEnvironmentTest {
         // Change the CheckpointConfig
         environment.getCheckpointConfig().setCheckpointStorage(allowedPath);
 
-        environment.fromCollection(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
+        environment.fromData(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
         assertThatThrownBy(() -> executor.accept(environment))
                 .isInstanceOf(ExecutorReachedException.class);
     }
@@ -186,7 +186,7 @@ class StreamContextEnvironmentTest {
         final StreamContextEnvironment environment =
                 constructStreamContextEnvironment(clusterConfig, Collections.emptyList());
 
-        environment.fromCollection(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
+        environment.fromData(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
         assertThatThrownBy(() -> executor.accept(environment))
                 .isInstanceOf(ExecutorReachedException.class);
     }
@@ -202,7 +202,7 @@ class StreamContextEnvironmentTest {
         final StreamContextEnvironment environment =
                 constructStreamContextEnvironment(clusterConfig, Collections.emptyList());
 
-        environment.fromCollection(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
+        environment.fromData(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
         assertThatThrownBy(() -> executor.accept(environment))
                 .isInstanceOf(ExecutorReachedException.class);
     }
@@ -235,7 +235,7 @@ class StreamContextEnvironmentTest {
         environment.configure(jobConfig);
         environment.getConfig().setMaxParallelism(1024);
 
-        environment.fromCollection(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
+        environment.fromData(Collections.singleton(1)).sinkTo(new DiscardingSink<>());
         assertThatThrownBy(() -> executor.accept(environment))
                 .isInstanceOf(ExecutorReachedException.class);
         assertThat(environment.getConfig().getGlobalJobParameters().toMap())
diff --git a/flink-clients/src/test/java/org/apache/flink/client/testjar/ForbidConfigurationJob.java b/flink-clients/src/test/java/org/apache/flink/client/testjar/ForbidConfigurationJob.java
index 19d84990f91..8e7f0222e2a 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/testjar/ForbidConfigurationJob.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/testjar/ForbidConfigurationJob.java
@@ -37,7 +37,7 @@ public class ForbidConfigurationJob {
         final StreamExecutionEnvironment env =
                 StreamExecutionEnvironment.getExecutionEnvironment(config);
 
-        env.fromCollection(Lists.newArrayList(1, 2, 3)).sinkTo(new DiscardingSink<>());
+        env.fromData(Lists.newArrayList(1, 2, 3)).sinkTo(new DiscardingSink<>());
         env.execute();
     }
 }
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index e6332090289..3b6d6c2bd89 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -37,8 +37,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.examples.join.WindowJoinSampleData.GradeSource;
-import org.apache.flink.streaming.examples.join.WindowJoinSampleData.SalarySource;
 
 import java.time.Duration;
 
@@ -76,12 +74,18 @@ public class WindowJoin {
 
         // create the data sources for both grades and salaries
         DataStream<Tuple2<String, Integer>> grades =
-                GradeSource.getSource(env, rate)
-                        .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create());
+                env.fromSource(
+                                WindowJoinSampleData.getGradeGeneratorSource(rate),
+                                IngestionTimeWatermarkStrategy.create(),
+                                "Grades Data Generator")
+                        .setParallelism(1);
 
         DataStream<Tuple2<String, Integer>> salaries =
-                SalarySource.getSource(env, rate)
-                        .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create());
+                env.fromSource(
+                                WindowJoinSampleData.getSalaryGeneratorSource(rate),
+                                IngestionTimeWatermarkStrategy.create(),
+                                "Grades Data Generator")
+                        .setParallelism(1);
 
         // run the actual window join program
         // for testability, this functionality is in a separate method.
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoinSampleData.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoinSampleData.java
index fbaec8e64de..55f59ed436a 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoinSampleData.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoinSampleData.java
@@ -20,13 +20,11 @@ package org.apache.flink.streaming.examples.join;
 
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.examples.utils.ThrottledIterator;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
 
-import java.io.Serializable;
-import java.util.Iterator;
 import java.util.Random;
 
 /** Sample data for the {@link WindowJoin} example. */
@@ -38,58 +36,27 @@ public class WindowJoinSampleData {
     static final int SALARY_MAX = 10000;
 
     /** Continuously generates (name, grade). */
-    public static class GradeSource implements Iterator<Tuple2<String, Integer>>, Serializable {
-
-        private final Random rnd = new Random(hashCode());
-
-        @Override
-        public boolean hasNext() {
-            return true;
-        }
-
-        @Override
-        public Tuple2<String, Integer> next() {
-            return new Tuple2<>(NAMES[rnd.nextInt(NAMES.length)], rnd.nextInt(GRADE_COUNT) + 1);
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException();
-        }
-
-        public static DataStream<Tuple2<String, Integer>> getSource(
-                StreamExecutionEnvironment env, long rate) {
-            return env.fromCollection(
-                    new ThrottledIterator<>(new GradeSource(), rate),
-                    TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
-        }
+    public static DataGeneratorSource<Tuple2<String, Integer>> getGradeGeneratorSource(
+            double elementsPerSecond) {
+        return getTupleGeneratorSource(GRADE_COUNT, elementsPerSecond);
     }
 
     /** Continuously generates (name, salary). */
-    public static class SalarySource implements Iterator<Tuple2<String, Integer>>, Serializable {
-
-        private final Random rnd = new Random(hashCode());
-
-        @Override
-        public boolean hasNext() {
-            return true;
-        }
-
-        @Override
-        public Tuple2<String, Integer> next() {
-            return new Tuple2<>(NAMES[rnd.nextInt(NAMES.length)], rnd.nextInt(SALARY_MAX) + 1);
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException();
-        }
+    public static DataGeneratorSource<Tuple2<String, Integer>> getSalaryGeneratorSource(
+            double elementsPerSecond) {
+        return getTupleGeneratorSource(SALARY_MAX, elementsPerSecond);
+    }
 
-        public static DataStream<Tuple2<String, Integer>> getSource(
-                StreamExecutionEnvironment env, long rate) {
-            return env.fromCollection(
-                    new ThrottledIterator<>(new SalarySource(), rate),
-                    TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
-        }
+    private static DataGeneratorSource<Tuple2<String, Integer>> getTupleGeneratorSource(
+            int maxValue, double elementsPerSecond) {
+        final Random rnd = new Random();
+        final GeneratorFunction<Long, Tuple2<String, Integer>> generatorFunction =
+                index -> new Tuple2<>(NAMES[rnd.nextInt(NAMES.length)], rnd.nextInt(maxValue) + 1);
+
+        return new DataGeneratorSource<>(
+                generatorFunction,
+                Long.MAX_VALUE,
+                RateLimiterStrategy.perSecond(elementsPerSecond),
+                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
     }
 }
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamSQLExample.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamSQLExample.java
index 194b7e1f90b..ad7ad7ef40e 100644
--- a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamSQLExample.java
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamSQLExample.java
@@ -55,14 +55,14 @@ public final class StreamSQLExample {
         final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
 
         final DataStream<Order> orderA =
-                env.fromCollection(
+                env.fromData(
                         Arrays.asList(
                                 new Order(1L, "beer", 3),
                                 new Order(1L, "diaper", 4),
                                 new Order(3L, "rubber", 2)));
 
         final DataStream<Order> orderB =
-                env.fromCollection(
+                env.fromData(
                         Arrays.asList(
                                 new Order(2L, "pen", 3),
                                 new Order(2L, "rubber", 3),
diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ProtobufTestHelper.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ProtobufTestHelper.java
index daaa8d68fcc..c24b1909aa8 100644
--- a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ProtobufTestHelper.java
+++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ProtobufTestHelper.java
@@ -56,7 +56,7 @@ public class ProtobufTestHelper {
                 (Row) DataFormatConverters.getConverterForDataType(rowDataType).toExternal(rowData);
         TypeInformation<Row> rowTypeInfo =
                 (TypeInformation<Row>) TypeConversions.fromDataTypeToLegacyInfo(rowDataType);
-        DataStream<Row> rows = env.fromCollection(Collections.singletonList(row), rowTypeInfo);
+        DataStream<Row> rows = env.fromData(Collections.singletonList(row), rowTypeInfo);
 
         Table table = tableEnv.fromDataStream(rows);
         tableEnv.createTemporaryView("t", table);
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java
index 99f73fa524b..01f51ca997a 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java
@@ -355,9 +355,7 @@ public class SavepointWriter {
         }
 
         DataStream<OperatorState> existingOperatorStates =
-                executionEnvironment
-                        .fromCollection(existingOperators)
-                        .name("existingOperatorStates");
+                executionEnvironment.fromData(existingOperators).name("existingOperatorStates");
 
         existingOperatorStates
                 .flatMap(new StatePathExtractor())
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
index f5fb6c16dbc..13e6c47ac75 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
@@ -113,12 +113,12 @@ public class SavepointWriterITCase extends AbstractTestBase {
         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
 
         StateBootstrapTransformation<Account> transformation =
-                OperatorTransformation.bootstrapWith(env.fromCollection(accounts))
+                OperatorTransformation.bootstrapWith(env.fromData(accounts))
                         .keyBy(acc -> acc.id)
                         .transform(new AccountBootstrapper());
 
         StateBootstrapTransformation<CurrencyRate> broadcastTransformation =
-                OperatorTransformation.bootstrapWith(env.fromCollection(currencyRates))
+                OperatorTransformation.bootstrapWith(env.fromData(currencyRates))
                         .transform(new CurrencyBootstrapFunction());
 
         SavepointWriter writer =
@@ -141,15 +141,15 @@ public class SavepointWriterITCase extends AbstractTestBase {
         }
 
         DataStream<Account> stream =
-                env.fromCollection(accounts)
+                env.fromData(accounts)
                         .keyBy(acc -> acc.id)
                         .flatMap(new UpdateAndGetAccount())
                         .uid(ACCOUNT_UID);
 
         final CloseableIterator<Account> results = stream.collectAsync();
 
-        env.fromCollection(currencyRates)
-                .connect(env.fromCollection(currencyRates).broadcast(descriptor))
+        env.fromData(currencyRates)
+                .connect(env.fromData(currencyRates).broadcast(descriptor))
                 .process(new CurrencyValidationFunction())
                 .uid(CURRENCY_UID)
                 .sinkTo(new DiscardingSink<>());
@@ -192,7 +192,7 @@ public class SavepointWriterITCase extends AbstractTestBase {
         }
 
         DataStream<Account> stream =
-                sEnv.fromCollection(accounts)
+                sEnv.fromData(accounts)
                         .keyBy(acc -> acc.id)
                         .flatMap(new UpdateAndGetAccount())
                         .uid(ACCOUNT_UID);
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterUidModificationITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterUidModificationITCase.java
index e6c83ec7c30..1b6b665d687 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterUidModificationITCase.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterUidModificationITCase.java
@@ -161,7 +161,7 @@ public class SavepointWriterUidModificationITCase {
 
     private static StateBootstrapTransformation<Integer> bootstrap(
             StreamExecutionEnvironment env, Collection<Integer> data) {
-        return OperatorTransformation.bootstrapWith(env.fromCollection(data))
+        return OperatorTransformation.bootstrapWith(env.fromData(data))
                 .keyBy(v -> v)
                 .transform(new StateBootstrapper());
     }
@@ -194,7 +194,7 @@ public class SavepointWriterUidModificationITCase {
         final List<CloseableIterator<Integer>> iterators = new ArrayList<>();
         for (Tuple2<Collection<Integer>, String> assertion : assertions) {
             iterators.add(
-                    env.fromCollection(assertion.f0)
+                    env.fromData(assertion.f0)
                             .keyBy(v -> v)
                             .map(new StateReader())
                             .uid(assertion.f1)
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java
index 32345098aca..bc08d725d2f 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java
@@ -146,7 +146,7 @@ public class SavepointWriterWindowITCase extends AbstractTestBase {
         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
 
         DataStream<Tuple2<String, Integer>> bootstrapData =
-                env.fromCollection(WORDS)
+                env.fromData(WORDS)
                         .map(word -> Tuple2.of(word, 1))
                         .returns(TUPLE_TYPE_INFO)
                         .assignTimestampsAndWatermarks(
@@ -190,7 +190,7 @@ public class SavepointWriterWindowITCase extends AbstractTestBase {
         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
 
         DataStream<Tuple2<String, Integer>> bootstrapData =
-                env.fromCollection(WORDS)
+                env.fromData(WORDS)
                         .map(word -> Tuple2.of(word, 1))
                         .returns(TUPLE_TYPE_INFO)
                         .assignTimestampsAndWatermarks(
@@ -236,7 +236,7 @@ public class SavepointWriterWindowITCase extends AbstractTestBase {
         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
 
         DataStream<Tuple2<String, Integer>> bootstrapData =
-                env.fromCollection(WORDS)
+                env.fromData(WORDS)
                         .map(word -> Tuple2.of(word, 1), TUPLE_TYPE_INFO)
                         .assignTimestampsAndWatermarks(
                                 WatermarkStrategy.<Tuple2<String, Integer>>noWatermarks()
@@ -283,7 +283,7 @@ public class SavepointWriterWindowITCase extends AbstractTestBase {
         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
 
         DataStream<Tuple2<String, Integer>> bootstrapData =
-                env.fromCollection(WORDS)
+                env.fromData(WORDS)
                         .map(word -> Tuple2.of(word, 1))
                         .returns(TUPLE_TYPE_INFO)
                         .assignTimestampsAndWatermarks(
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointITCase.java
index 1d386ef084d..e94ad57ee42 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointITCase.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointITCase.java
@@ -151,15 +151,15 @@ public class WritableSavepointITCase extends AbstractTestBase {
         sEnv.setStateBackend(backend);
 
         DataStream<Account> stream =
-                sEnv.fromCollection(accounts)
+                sEnv.fromData(accounts)
                         .keyBy(acc -> acc.id)
                         .flatMap(new UpdateAndGetAccount())
                         .uid(ACCOUNT_UID);
 
         CloseableIterator<Account> results = stream.collectAsync();
 
-        sEnv.fromCollection(currencyRates)
-                .connect(sEnv.fromCollection(currencyRates).broadcast(descriptor))
+        sEnv.fromData(currencyRates)
+                .connect(sEnv.fromData(currencyRates).broadcast(descriptor))
                 .process(new CurrencyValidationFunction())
                 .uid(CURRENCY_UID)
                 .sinkTo(new DiscardingSink<>());
@@ -195,7 +195,7 @@ public class WritableSavepointITCase extends AbstractTestBase {
         sEnv.setStateBackend(backend);
 
         DataStream<Account> stream =
-                sEnv.fromCollection(accounts)
+                sEnv.fromData(accounts)
                         .keyBy(acc -> acc.id)
                         .flatMap(new UpdateAndGetAccount())
                         .uid(ACCOUNT_UID);
diff --git a/flink-python/src/test/java/org/apache/flink/python/util/PythonConfigUtilTest.java b/flink-python/src/test/java/org/apache/flink/python/util/PythonConfigUtilTest.java
index f9e0362181b..322f21a34ba 100644
--- a/flink-python/src/test/java/org/apache/flink/python/util/PythonConfigUtilTest.java
+++ b/flink-python/src/test/java/org/apache/flink/python/util/PythonConfigUtilTest.java
@@ -39,7 +39,7 @@ class PythonConfigUtilTest {
         config.set(PipelineOptions.NAME, jobName);
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
 
-        env.fromCollection(Collections.singletonList("test")).sinkTo(new DiscardingSink<>());
+        env.fromData(Collections.singletonList("test")).sinkTo(new DiscardingSink<>());
         StreamGraph streamGraph = env.getStreamGraph(true);
         assertThat(streamGraph.getJobName()).isEqualTo(jobName);
     }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 18dc49d3895..4b116805166 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -41,6 +41,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.Utils;
@@ -1474,6 +1475,8 @@ public class StreamExecutionEnvironment implements AutoCloseable {
      * @param data The collection of elements to create the data stream from.
      * @param <OUT> The generic type of the returned data stream.
      * @return The data stream representing the given collection
+     * @deprecated This method will be removed a future release, possibly as early as version 2.0.
+     *     Use {@link #fromData(Collection)} instead.
      */
     public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
         TypeInformation<OUT> typeInfo = extractTypeInfoFromCollection(data);
@@ -1490,6 +1493,8 @@ public class StreamExecutionEnvironment implements AutoCloseable {
      * @param typeInfo The TypeInformation for the produced data stream
      * @param <OUT> The type of the returned data stream
      * @return The data stream representing the given collection
+     * @deprecated This method will be removed a future release, possibly as early as version 2.0.
+     *     Use {@link #fromData(Collection, TypeInformation)} instead.
      */
     public <OUT> DataStreamSource<OUT> fromCollection(
             Collection<OUT> data, TypeInformation<OUT> typeInfo) {
@@ -1519,6 +1524,11 @@ public class StreamExecutionEnvironment implements AutoCloseable {
      * @return The data stream representing the elements in the iterator
      * @see #fromCollection(java.util.Iterator,
      *     org.apache.flink.api.common.typeinfo.TypeInformation)
+     * @deprecated This method will be removed a future release, possibly as early as version 2.0.
+     *     Use {@link #fromData(Collection, TypeInformation)} instead. For rate-limited data
+     *     generation, use {@link DataGeneratorSource} with {@link RateLimiterStrategy}. If you need
+     *     to use a fixed set of elements in such scenario, combine it with {@link
+     *     FromElementsGeneratorFunction}.
      */
     public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, Class<OUT> type) {
         return fromCollection(data, TypeExtractor.getForClass(type));
@@ -1540,6 +1550,11 @@ public class StreamExecutionEnvironment implements AutoCloseable {
      * @param typeInfo The TypeInformation for the produced data stream
      * @param <OUT> The type of the returned data stream
      * @return The data stream representing the elements in the iterator
+     * @deprecated This method will be removed a future release, possibly as early as version 2.0.
+     *     Use {@link #fromData(Collection, TypeInformation)} instead. For rate-limited data
+     *     generation, use {@link DataGeneratorSource} with {@link RateLimiterStrategy}. If you need
+     *     to use a fixed set of elements in such scenario, combine it with {@link
+     *     FromElementsGeneratorFunction}.
      */
     public <OUT> DataStreamSource<OUT> fromCollection(
             Iterator<OUT> data, TypeInformation<OUT> typeInfo) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
index 0f1aa2c7a08..b2be45482fb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
@@ -141,14 +141,6 @@ class StreamExecutionEnvironmentTest {
             TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO;
             StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-            DataStreamSource<Integer> dataStream1 =
-                    env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo);
-
-            assertThatThrownBy(() -> dataStream1.setParallelism(4))
-                    .isInstanceOf(IllegalArgumentException.class);
-
-            dataStream1.sinkTo(new DiscardingSink<>());
-
             DataStreamSource<Integer> dataStream2 =
                     env.fromParallelCollection(new DummySplittableIterator<Integer>(), typeInfo)
                             .setParallelism(4);
@@ -158,9 +150,6 @@ class StreamExecutionEnvironmentTest {
             final StreamGraph streamGraph = env.getStreamGraph();
             streamGraph.getStreamingPlanAsJSON();
 
-            assertThat(streamGraph.getStreamNode(dataStream1.getId()).getParallelism())
-                    .as("Parallelism of collection source must be 1.")
-                    .isOne();
             assertThat(streamGraph.getStreamNode(dataStream2.getId()).getParallelism())
                     .as("Parallelism of parallel collection source must be 4.")
                     .isEqualTo(4);
@@ -172,6 +161,8 @@ class StreamExecutionEnvironmentTest {
 
     @Test
     void testSources() {
+        // TODO: remove this test when SourceFunction API gets removed together with the deprecated
+        //  StreamExecutionEnvironment generateSequence() and fromCollection() methods
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
         SourceFunction<Integer> srcFun =
@@ -329,33 +320,25 @@ class StreamExecutionEnvironmentTest {
 
     @Test
     void testGetStreamGraph() {
-        try {
-            TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO;
-            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-            DataStreamSource<Integer> dataStream1 =
-                    env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo);
-            dataStream1.sinkTo(new DiscardingSink<>());
-            assertThat(env.getStreamGraph().getStreamNodes().size()).isEqualTo(2);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-            DataStreamSource<Integer> dataStream2 =
-                    env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo);
-            dataStream2.sinkTo(new DiscardingSink<>());
-            assertThat(env.getStreamGraph().getStreamNodes().size()).isEqualTo(2);
-
-            DataStreamSource<Integer> dataStream3 =
-                    env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo);
-            dataStream3.sinkTo(new DiscardingSink<>());
-            // Does not clear the transformations.
-            env.getExecutionPlan();
-            DataStreamSource<Integer> dataStream4 =
-                    env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo);
-            dataStream4.sinkTo(new DiscardingSink<>());
-            assertThat(env.getStreamGraph().getStreamNodes().size()).isEqualTo(4);
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+        DataStreamSource<Integer> dataStream1 = env.fromData(1, 2, 3);
+        dataStream1.sinkTo(new DiscardingSink<>());
+        assertThat(env.getStreamGraph().getStreamNodes().size()).isEqualTo(2);
+
+        DataStreamSource<Integer> dataStream2 = env.fromData(1, 2, 3);
+        dataStream2.sinkTo(new DiscardingSink<>());
+        // Previous getStreamGraph() call cleaned dataStream1 transformations
+        assertThat(env.getStreamGraph().getStreamNodes().size()).isEqualTo(2);
+
+        DataStreamSource<Integer> dataStream3 = env.fromData(1, 2, 3);
+        dataStream3.sinkTo(new DiscardingSink<>());
+        // Does not clear the transformations.
+        env.getExecutionPlan();
+        DataStreamSource<Integer> dataStream4 = env.fromData(1, 2, 3);
+        dataStream4.sinkTo(new DiscardingSink<>());
+        // dataStream3 are preserved
+        assertThat(env.getStreamGraph().getStreamNodes().size()).isEqualTo(4);
     }
 
     @Test
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java
index 516424be6f8..d71ccd343db 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java
@@ -65,7 +65,7 @@ public class ExecutorDiscoveryAndJobClientTest {
     private JobExecutionResult executeTestJobBasedOnConfig(final Configuration configuration)
             throws Exception {
         final StreamExecutionEnvironment env = new StreamExecutionEnvironment(configuration);
-        env.fromCollection(Collections.singletonList(42)).sinkTo(new DiscardingSink<>());
+        env.fromData(Collections.singletonList(42)).sinkTo(new DiscardingSink<>());
         return env.execute();
     }
 
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java
index 6caf9a5a3aa..b976d0ab650 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.mocks.MockSource;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.expressions.ApiExpressionUtils;
@@ -39,7 +40,6 @@ import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Arrays;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -125,8 +125,7 @@ class MultipleInputNodeCreationProcessorTest extends TableTestBase {
     }
 
     private void createNonChainableStream(TableTestUtil util) {
-        DataStreamSource<Integer> dataStream =
-                util.getStreamEnv().fromCollection(Arrays.asList(1, 2, 3));
+        DataStreamSource<Integer> dataStream = util.getStreamEnv().addSource(new LegacySource());
         TableTestUtil.createTemporaryView(
                 util.tableEnv(),
                 "nonChainableStream",
@@ -172,4 +171,13 @@ class MultipleInputNodeCreationProcessorTest extends TableTestBase {
                         + "'\n"
                         + ")");
     }
+
+    private static class LegacySource implements SourceFunction<Integer> {
+        public void run(SourceContext<Integer> sourceContext) {
+            sourceContext.collect(1);
+        }
+
+        @Override
+        public void cancel() {}
+    }
 }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
index a756210d768..f8ea2a19b9b 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
@@ -226,7 +226,7 @@ class DataStreamJavaITCase {
                     Row.of(null, Row.of(false, null), Collections.singletonMap("world", null))
                 };
 
-        final DataStream<Row> dataStream = env.fromCollection(Arrays.asList(rows), typeInfo);
+        final DataStream<Row> dataStream = env.fromData(Arrays.asList(rows), typeInfo);
 
         final TableResult result = tableEnv.fromDataStream(dataStream).execute();
 
@@ -305,7 +305,7 @@ class DataStreamJavaITCase {
                         Tuple2.of(DayOfWeek.MONDAY, ZoneOffset.UTC),
                         Tuple2.of(DayOfWeek.FRIDAY, ZoneOffset.ofHours(5)));
 
-        final DataStream<Tuple2<DayOfWeek, ZoneOffset>> dataStream = env.fromCollection(rawRecords);
+        final DataStream<Tuple2<DayOfWeek, ZoneOffset>> dataStream = env.fromData(rawRecords);
 
         // verify incoming type information
         assertThat(dataStream.getType()).isInstanceOf(TupleTypeInfo.class);
@@ -945,7 +945,7 @@ class DataStreamJavaITCase {
 
     private DataStream<Tuple3<Long, Integer, String>> getWatermarkedDataStream() {
         final DataStream<Tuple3<Long, Integer, String>> dataStream =
-                env.fromCollection(
+                env.fromData(
                         Arrays.asList(
                                 Tuple3.of(1L, 42, "a"),
                                 Tuple3.of(2L, 5, "a"),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/TimeAttributesITCase.scala b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/TimeAttributesITCase.scala
index 22a08755939..60628f53ef6 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/TimeAttributesITCase.scala
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/TimeAttributesITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.runtime.stream.table
 
 import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
 import org.apache.flink.api.scala.createTypeInformation
-import org.apache.flink.core.testutils.FlinkMatchers.containsMessage
+import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.table.api._
 import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
 import org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase, TestingAppendSink}
@@ -36,10 +36,17 @@ import java.time.{Duration, Instant, LocalDateTime, ZoneOffset}
 class TimeAttributesITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode) {
 
   @TestTemplate
-  def testMissingTimeAttributeThrowsCorrectException(): Unit = {
-    val data = List(1L -> "hello", 2L -> "world")
-    val stream = env.fromCollection[(Long, String)](data)
+  def testMissingTimeAttributeInLegacySourceThrowsCorrectException(): Unit = {
+    //TODO: this test can be removed when SourceFunction gets removed (FLIP-27 sources set TimestampAssigner.NO_TIMESTAMP value as event time when no timestamp is provided. See SourceOutputWithWatermarks#collect())
+    val stream = env
+      .addSource(new SourceFunction[(Long, String)]() {
+        def run(ctx: SourceFunction.SourceContext[(Long, String)]) {
+          ctx.collect(1L -> "hello")
+          ctx.collect(2L -> "world")
+        }
 
+        def cancel() {}
+      })
     tEnv.createTemporaryView("test", stream, $"event_time".rowtime(), $"data")
     val result = tEnv.sqlQuery("SELECT * FROM test")
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
index f0f6e622952..dee5eda6111 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
@@ -115,7 +115,7 @@ abstract class ExpressionTestBase(isStreaming: Boolean = true) {
       tEnv.getCatalogManager.getDataTypeFactory.createDataType(testDataType)
     }
     if (containsLegacyTypes) {
-      val ds = env.fromCollection(Collections.emptyList[Row](), typeInfo)
+      val ds = env.fromData(Collections.emptyList[Row](), typeInfo)
       tEnv.createTemporaryView(tableName, ds, typeInfo.getFieldNames.map(api.$): _*)
       functions.foreach(f => tEnv.createTemporarySystemFunction(f._1, f._2))
     } else {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 26193a650f4..9d79525373f 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -91,6 +91,8 @@ import java.nio.file.{Files, Path, Paths}
 import java.time.Duration
 import java.util.Collections
 
+import scala.collection.JavaConverters._
+
 /** Test base for testing Table API / SQL plans. */
 abstract class TableTestBase {
 
@@ -1409,7 +1411,7 @@ class TestTableSource(override val isBounded: Boolean, schema: TableSchema)
   extends StreamTableSource[Row] {
 
   override def getDataStream(execEnv: environment.StreamExecutionEnvironment): DataStream[Row] = {
-    execEnv.fromCollection(List[Row](), getReturnType)
+    execEnv.fromData(List[Row]().asJava, getReturnType)
   }
 
   override def getReturnType: TypeInformation[Row] = {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala
index dcd3f200a43..ed2c7811947 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala
@@ -202,7 +202,7 @@ class TestTableSourceWithTime[T](
   with DefinedFieldMapping {
 
   override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T] = {
-    val dataStream = execEnv.fromCollection(values, returnType)
+    val dataStream = execEnv.fromData(values.asJava, returnType)
     dataStream.getTransformation.setMaxParallelism(1)
     dataStream
   }
@@ -528,7 +528,7 @@ class TestLegacyFilterableTableSource(
     }
 
     execEnv
-      .fromCollection[Row](
+      .fromData[Row](
         applyPredicatesToRows(records).asJava,
         fromDataTypeToTypeInfo(getProducedDataType).asInstanceOf[RowTypeInfo])
       .setParallelism(1)
@@ -929,7 +929,7 @@ class TestStreamTableSource(tableSchema: TableSchema, values: Seq[Row])
   extends StreamTableSource[Row] {
 
   override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
-    execEnv.fromCollection(values, tableSchema.toRowType)
+    execEnv.fromData(values.asJava, tableSchema.toRowType)
   }
 
   override def getProducedDataType: DataType = tableSchema.toRowDataType
@@ -1087,7 +1087,7 @@ class TestPartitionableTableSource(
       data.values.flatten
     }
 
-    execEnv.fromCollection[Row](remainingData, getReturnType).setParallelism(1).setMaxParallelism(1)
+    execEnv.fromData[Row](remainingData, getReturnType).setParallelism(1).setMaxParallelism(1)
   }
 
   override def explainSource(): String = {
@@ -1218,7 +1218,7 @@ class WithoutTimeAttributesTableSource(bounded: Boolean) extends StreamTableSour
     )
     val dataStream =
       execEnv
-        .fromCollection(data)
+        .fromData(data.asJava)
         .returns(fromDataTypeToTypeInfo(getProducedDataType).asInstanceOf[RowTypeInfo])
     dataStream.getTransformation.setMaxParallelism(1)
     dataStream
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputTestBase.java
index 24bdf3ca17b..8420d70221f 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputTestBase.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputTestBase.java
@@ -53,7 +53,7 @@ import java.util.stream.Collectors;
 public class MultipleInputTestBase {
 
     protected Transformation<RowData> createSource(StreamExecutionEnvironment env, String... data) {
-        return env.fromCollection(
+        return env.fromData(
                         Arrays.stream(data)
                                 .map(StringData::fromString)
                                 .map(GenericRowData::of)
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
index faeac88842a..461f8dfa432 100644
--- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
@@ -141,7 +141,7 @@ public abstract class SinkTestSuiteBase<T extends Comparable<T>> {
                                 .build());
         execEnv.enableCheckpointing(50);
         DataStream<T> dataStream =
-                execEnv.fromCollection(testRecords)
+                execEnv.fromData(testRecords)
                         .name("sourceInSinkTest")
                         .setParallelism(1)
                         .returns(externalContext.getProducedType());
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index d048048f8a2..c0b5e017865 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -133,7 +133,7 @@ public class AccumulatorLiveITCase extends TestLogger {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
 
-        DataStream<Integer> input = env.fromCollection(inputData);
+        DataStream<Integer> input = env.fromData(inputData);
         input.flatMap(new NotifyingMapper())
                 .writeUsingOutputFormat(new DummyOutputFormat())
                 .disableChaining();
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
index 01c0987a60d..e9822515434 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
@@ -85,7 +85,7 @@ public class BroadcastStateITCase extends AbstractTestBase {
                         .keyBy((KeySelector<Long, Long>) value -> value);
 
         final DataStream<String> srcTwo =
-                env.fromCollection(expected.values())
+                env.fromData(expected.values())
                         .assignTimestampsAndWatermarks(
                                 new CustomWmEmitter<String>() {
 
@@ -145,7 +145,7 @@ public class BroadcastStateITCase extends AbstractTestBase {
                                 });
 
         final DataStream<String> srcTwo =
-                env.fromCollection(expected.values())
+                env.fromData(expected.values())
                         .assignTimestampsAndWatermarks(
                                 new CustomWmEmitter<String>() {
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java
index 8c195ae854a..6accaf64034 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java
@@ -53,7 +53,7 @@ public class DataStreamPojoITCase extends AbstractTestBase {
         see.getConfig().disableObjectReuse();
         see.setParallelism(3);
 
-        DataStream<Data> dataStream = see.fromCollection(elements);
+        DataStream<Data> dataStream = see.fromData(elements);
 
         DataStream<Data> summedStream =
                 dataStream
@@ -105,7 +105,7 @@ public class DataStreamPojoITCase extends AbstractTestBase {
         see.getConfig().disableObjectReuse();
         see.setParallelism(4);
 
-        DataStream<Data> dataStream = see.fromCollection(elements);
+        DataStream<Data> dataStream = see.fromData(elements);
 
         DataStream<Data> summedStream =
                 dataStream
@@ -160,7 +160,7 @@ public class DataStreamPojoITCase extends AbstractTestBase {
         see.getConfig().disableObjectReuse();
         see.setParallelism(4);
 
-        DataStream<Data> dataStream = see.fromCollection(elements);
+        DataStream<Data> dataStream = see.fromData(elements);
 
         DataStream<Data> summedStream =
                 dataStream
@@ -198,7 +198,7 @@ public class DataStreamPojoITCase extends AbstractTestBase {
     public void testFailOnNestedPojoFieldAccessor() throws Exception {
         StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
 
-        DataStream<Data> dataStream = see.fromCollection(elements);
+        DataStream<Data> dataStream = see.fromData(elements);
         dataStream.keyBy("aaa", "stats.count").sum("stats.nonExistingField");
     }
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
index 6790436f778..147a17dc823 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
@@ -420,7 +420,7 @@ public class IterateITCase extends AbstractTestBase {
                 iterated = new boolean[parallelism];
 
                 DataStream<Boolean> source =
-                        env.fromCollection(Collections.nCopies(parallelism * 2, false))
+                        env.fromData(Collections.nCopies(parallelism * 2, false))
                                 .map(noOpBoolMap)
                                 .name("ParallelizeMap");
 
@@ -693,7 +693,7 @@ public class IterateITCase extends AbstractTestBase {
         env.enableCheckpointing();
 
         DataStream<Boolean> source =
-                env.fromCollection(Collections.nCopies(parallelism * 2, false))
+                env.fromData(Collections.nCopies(parallelism * 2, false))
                         .map(noOpBoolMap)
                         .name("ParallelizeMap");
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/LatencyMarkerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/LatencyMarkerITCase.java
index 1340377a519..43f0a835a21 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/LatencyMarkerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/LatencyMarkerITCase.java
@@ -21,7 +21,6 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -32,7 +31,6 @@ import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -55,13 +53,11 @@ public class LatencyMarkerITCase {
 
         List<Integer> broadcastData =
                 IntStream.range(0, inputCount).boxed().collect(Collectors.toList());
-        DataStream<Integer> broadcastDataStream =
-                env.fromCollection(broadcastData).setParallelism(1);
+        DataStream<Integer> broadcastDataStream = env.fromData(broadcastData).setParallelism(1);
 
         // broadcast the configurations and create the broadcast state
 
-        DataStream<String> streamWithoutData =
-                env.fromCollection(Collections.emptyList(), TypeInformation.of(String.class));
+        DataStream<String> dataStream = env.fromData("test");
 
         MapStateDescriptor<String, Integer> stateDescriptor =
                 new MapStateDescriptor<>(
@@ -70,7 +66,7 @@ public class LatencyMarkerITCase {
                         BasicTypeInfo.INT_TYPE_INFO);
 
         SingleOutputStreamOperator<Integer> processor =
-                streamWithoutData
+                dataStream
                         .connect(broadcastDataStream.broadcast(stateDescriptor))
                         .process(
                                 new BroadcastProcessFunction<String, Integer, Integer>() {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java
index 8489bf9b2aa..ae988039083 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java
@@ -99,7 +99,7 @@ public class PartitionerITCase extends AbstractTestBase {
         env.setParallelism(PARALLELISM);
 
         DataStream<Tuple1<String>> src =
-                env.fromCollection(INPUT.stream().map(Tuple1::of).collect(Collectors.toList()));
+                env.fromData(INPUT.stream().map(Tuple1::of).collect(Collectors.toList()));
 
         // partition by hash
         src.keyBy(0).map(new SubtaskIndexAssigner()).addSink(hashPartitionResultSink);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
index c8d42a64280..db26f2e1e3e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
@@ -231,7 +231,7 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(3);
 
-        DataStream<Integer> dataStream = env.fromCollection(elements);
+        DataStream<Integer> dataStream = env.fromData(elements);
 
         SingleOutputStreamOperator<Integer> passThroughtStream =
                 dataStream.process(
@@ -273,7 +273,7 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable {
         env.getConfig().enableObjectReuse();
         env.setParallelism(3);
 
-        DataStream<Integer> dataStream = env.fromCollection(elements);
+        DataStream<Integer> dataStream = env.fromData(elements);
 
         SingleOutputStreamOperator<Integer> passThroughtStream =
                 dataStream.process(
@@ -316,7 +316,7 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable {
         env.getConfig().enableObjectReuse();
         env.setParallelism(3);
 
-        DataStream<Integer> dataStream = env.fromCollection(elements);
+        DataStream<Integer> dataStream = env.fromData(elements);
 
         SingleOutputStreamOperator<Integer> passThroughtStream =
                 dataStream.process(
@@ -356,7 +356,7 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable {
         StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
         see.setParallelism(3);
 
-        DataStream<Integer> dataStream = see.fromCollection(elements);
+        DataStream<Integer> dataStream = see.fromData(elements);
 
         SingleOutputStreamOperator<Integer> passThroughtStream =
                 dataStream.process(
@@ -390,7 +390,7 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable {
         StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
         see.setParallelism(3);
 
-        DataStream<Integer> dataStream = see.fromCollection(elements);
+        DataStream<Integer> dataStream = see.fromData(elements);
 
         SingleOutputStreamOperator<Integer> passThroughtStream =
                 dataStream.process(
@@ -427,8 +427,8 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable {
         StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
         see.setParallelism(3);
 
-        DataStream<Integer> ds1 = see.fromCollection(elements);
-        DataStream<Integer> ds2 = see.fromCollection(elements);
+        DataStream<Integer> ds1 = see.fromData(elements);
+        DataStream<Integer> ds2 = see.fromData(elements);
 
         SingleOutputStreamOperator<Integer> passThroughtStream =
                 ds1.connect(ds2)
@@ -482,8 +482,8 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable {
         StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
         see.setParallelism(3);
 
-        DataStream<Integer> ds1 = see.fromCollection(elements);
-        DataStream<Integer> ds2 = see.fromCollection(elements);
+        DataStream<Integer> ds1 = see.fromData(elements);
+        DataStream<Integer> ds2 = see.fromData(elements);
 
         SingleOutputStreamOperator<Integer> passThroughtStream =
                 ds1.connect(ds2)
@@ -538,7 +538,7 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable {
         StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
         see.setParallelism(3);
 
-        DataStream<Integer> dataStream = see.fromCollection(elements);
+        DataStream<Integer> dataStream = see.fromData(elements);
 
         SingleOutputStreamOperator<Integer> passThroughtStream =
                 dataStream
@@ -586,8 +586,8 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable {
         StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
         see.setParallelism(3);
 
-        DataStream<Integer> ds1 = see.fromCollection(elements);
-        DataStream<Integer> ds2 = see.fromCollection(elements);
+        DataStream<Integer> ds1 = see.fromData(elements);
+        DataStream<Integer> ds2 = see.fromData(elements);
 
         SingleOutputStreamOperator<Integer> passThroughtStream =
                 ds1.keyBy(i -> i)
@@ -640,8 +640,8 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable {
         StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
         see.setParallelism(3);
 
-        DataStream<Integer> ds1 = see.fromCollection(elements);
-        DataStream<Integer> ds2 = see.fromCollection(elements);
+        DataStream<Integer> ds1 = see.fromData(elements);
+        DataStream<Integer> ds2 = see.fromData(elements);
 
         SingleOutputStreamOperator<Integer> passThroughtStream =
                 ds1.keyBy(i -> i)
@@ -707,8 +707,8 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable {
         StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
         see.setParallelism(3);
 
-        DataStream<Integer> ds1 = see.fromCollection(elements);
-        DataStream<Integer> ds2 = see.fromCollection(elements);
+        DataStream<Integer> ds1 = see.fromData(elements);
+        DataStream<Integer> ds2 = see.fromData(elements);
 
         SingleOutputStreamOperator<Integer> passThroughtStream =
                 ds1.keyBy(i -> i)
@@ -766,8 +766,8 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable {
         StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
         see.setParallelism(3);
 
-        DataStream<Integer> ds1 = see.fromCollection(elements);
-        DataStream<Integer> ds2 = see.fromCollection(elements);
+        DataStream<Integer> ds1 = see.fromData(elements);
+        DataStream<Integer> ds2 = see.fromData(elements);
 
         SingleOutputStreamOperator<Integer> passThroughtStream =
                 ds1.keyBy(i -> i)
@@ -830,7 +830,7 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable {
         StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
         see.setParallelism(3);
 
-        DataStream<Integer> dataStream = see.fromCollection(elements);
+        DataStream<Integer> dataStream = see.fromData(elements);
 
         dataStream
                 .process(
@@ -886,7 +886,7 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable {
         StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
         see.setParallelism(1);
 
-        DataStream<Integer> dataStream = see.fromCollection(elements);
+        DataStream<Integer> dataStream = see.fromData(elements);
 
         OutputTag<Integer> lateDataTag = new OutputTag<Integer>("late") {};
 
@@ -939,7 +939,7 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable {
         StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
         see.setParallelism(3);
 
-        DataStream<Integer> dataStream = see.fromCollection(elements);
+        DataStream<Integer> dataStream = see.fromData(elements);
 
         OutputTag<Integer> lateDataTag = new OutputTag<Integer>("late") {};
 
@@ -989,7 +989,7 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable {
         StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
         see.setParallelism(3);
 
-        DataStream<Integer> dataStream = see.fromCollection(elements);
+        DataStream<Integer> dataStream = see.fromData(elements);
 
         OutputTag<String> sideOutputTag = new OutputTag<String>("side") {};
 
@@ -1036,7 +1036,7 @@ public class SideOutputITCase extends AbstractTestBase implements Serializable {
         StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
         see.setParallelism(1);
 
-        DataStream<Integer> dataStream = see.fromCollection(elements);
+        DataStream<Integer> dataStream = see.fromData(elements);
 
         OutputTag<String> sideOutputTag = new OutputTag<String>("side") {};
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java
index fdc79986223..2eaa88b045c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java
@@ -151,7 +151,7 @@ public class SinkITCase extends AbstractTestBase {
     public void writerAndCommitterAndGlobalCommitterExecuteInBatchMode() throws Exception {
         final StreamExecutionEnvironment env = buildBatchEnv();
 
-        env.fromCollection(SOURCE_DATA)
+        env.fromData(SOURCE_DATA)
                 .sinkTo(
                         TestSink.newBuilder()
                                 .setDefaultCommitter(
@@ -193,7 +193,7 @@ public class SinkITCase extends AbstractTestBase {
     public void writerAndCommitterExecuteInBatchMode() throws Exception {
         final StreamExecutionEnvironment env = buildBatchEnv();
 
-        env.fromCollection(SOURCE_DATA)
+        env.fromData(SOURCE_DATA)
                 .sinkTo(
                         TestSink.newBuilder()
                                 .setDefaultCommitter(
@@ -238,7 +238,7 @@ public class SinkITCase extends AbstractTestBase {
     public void writerAndGlobalCommitterExecuteInBatchMode() throws Exception {
         final StreamExecutionEnvironment env = buildBatchEnv();
 
-        env.fromCollection(SOURCE_DATA)
+        env.fromData(SOURCE_DATA)
                 .sinkTo(
                         TestSink.newBuilder()
                                 .setCommittableSerializer(
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
index 70d94004748..b23dc34ed74 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
@@ -107,7 +107,7 @@ public class SinkV2ITCase extends AbstractTestBase {
     public void writerAndCommitterExecuteInBatchMode() throws Exception {
         final StreamExecutionEnvironment env = buildBatchEnv();
 
-        env.fromCollection(SOURCE_DATA)
+        env.fromData(SOURCE_DATA)
                 .sinkTo(
                         TestSinkV2.<Integer>newBuilder()
                                 .setDefaultCommitter(
diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml
index f03fea8d578..f313c69eba0 100644
--- a/tools/maven/checkstyle.xml
+++ b/tools/maven/checkstyle.xml
@@ -61,7 +61,7 @@ This file is based on the checkstyle file of Apache Beam.
 	-->
 
 	<module name="FileLength">
-		<property name="max" value="3000"/>
+		<property name="max" value="3100"/>
 	</module>
 
 	<!-- All Java AST specific tests live under TreeWalker module. -->