You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/06/24 18:42:53 UTC

[flink] 04/04: [FLINK-18428][API/DataStream] Rename StreamExecutionEnvironment#continuousSource() to StreamExecutionEnvironment#source().

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 49b5103299374641662d66b5165441b532206b71
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Wed Jun 24 21:22:18 2020 +0800

    [FLINK-18428][API/DataStream] Rename StreamExecutionEnvironment#continuousSource() to StreamExecutionEnvironment#source().
    
    This closes #12766
---
 docs/dev/stream/sources.md                                          | 6 +++---
 docs/dev/stream/sources.zh.md                                       | 6 +++---
 .../flink/connector/base/source/reader/CoordinatedSourceITCase.java | 6 +++---
 .../tests/test_stream_execution_environment_completeness.py         | 2 +-
 .../flink/streaming/api/environment/StreamExecutionEnvironment.java | 6 +++---
 .../flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java   | 4 ++--
 .../flink/streaming/api/scala/StreamExecutionEnvironment.scala      | 4 ++--
 .../flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala  | 2 +-
 8 files changed, 18 insertions(+), 18 deletions(-)

diff --git a/docs/dev/stream/sources.md b/docs/dev/stream/sources.md
index 669ca8f..3c3db90 100644
--- a/docs/dev/stream/sources.md
+++ b/docs/dev/stream/sources.md
@@ -187,7 +187,7 @@ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEn
 
 Source mySource = new MySource(...);
 
-DataStream<Integer> stream = env.continuousSource(
+DataStream<Integer> stream = env.fromSource(
         mySource,
         WatermarkStrategy.noWatermarks(),
         "MySourceName");
@@ -200,7 +200,7 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment()
 
 val mySource = new MySource(...)
 
-val stream = env.continuousSource(
+val stream = env.fromSource(
       mySource,
       WatermarkStrategy.noWatermarks(),
       "MySourceName")
@@ -352,7 +352,7 @@ Apparently, the `SourceReader` implementations can also implement their own thre
 The `WatermarkStrategy` is passed to the Source during creation in the DataStream API and creates both the [TimestampAssigner](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java) and [WatermarkGenerator](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGenerator.java).
 
 {% highlight java %}
-environment.continuousSource(
+environment.fromSource(
     Source<OUT, ?, ?> source,
     WatermarkStrategy<OUT> timestampsAndWatermarks,
     String sourceName)
diff --git a/docs/dev/stream/sources.zh.md b/docs/dev/stream/sources.zh.md
index 3f20388..a063ecb 100644
--- a/docs/dev/stream/sources.zh.md
+++ b/docs/dev/stream/sources.zh.md
@@ -187,7 +187,7 @@ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEn
 
 Source mySource = new MySource(...);
 
-DataStream<Integer> stream = env.continuousSource(
+DataStream<Integer> stream = env.fromSource(
         mySource,
         WatermarkStrategy.noWatermarks(),
         "MySourceName");
@@ -200,7 +200,7 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment()
 
 val mySource = new MySource(...)
 
-val stream = env.continuousSource(
+val stream = env.fromSource(
       mySource,
       WatermarkStrategy.noWatermarks(),
       "MySourceName")
@@ -352,7 +352,7 @@ Apparently, the `SourceReader` implementations can also implement their own thre
 The `WatermarkStrategy` is passed to the Source during creation in the DataStream API and creates both the [TimestampAssigner](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java) and [WatermarkGenerator](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGenerator.java).
 
 {% highlight java %}
-environment.continuousSource(
+environment.fromSource(
     Source<OUT, ?, ?> source,
     WatermarkStrategy<OUT> timestampsAndWatermarks,
     String sourceName)
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
index 6582210..3280c38 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
@@ -45,7 +45,7 @@ public class CoordinatedSourceITCase extends AbstractTestBase {
 	public void testEnumeratorReaderCommunication() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		MockBaseSource source = new MockBaseSource(2, 10, Boundedness.BOUNDED);
-		DataStream<Integer> stream = env.continuousSource(
+		DataStream<Integer> stream = env.fromSource(
 				source,
 				WatermarkStrategy.noWatermarks(),
 				"TestingSource");
@@ -57,11 +57,11 @@ public class CoordinatedSourceITCase extends AbstractTestBase {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		MockBaseSource source1 = new MockBaseSource(2, 10, Boundedness.BOUNDED);
 		MockBaseSource source2 = new MockBaseSource(2, 10, 20, Boundedness.BOUNDED);
-		DataStream<Integer> stream1 = env.continuousSource(
+		DataStream<Integer> stream1 = env.fromSource(
 				source1,
 				WatermarkStrategy.noWatermarks(),
 				"TestingSource1");
-		DataStream<Integer> stream2 = env.continuousSource(
+		DataStream<Integer> stream2 = env.fromSource(
 				source2,
 				WatermarkStrategy.noWatermarks(),
 				"TestingSource2");
diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
index c91e086..9764cb4 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
@@ -49,7 +49,7 @@ class StreamExecutionEnvironmentCompletenessTests(PythonAPICompletenessTestCase,
                 'createInput', 'createLocalEnvironmentWithWebUI', 'fromCollection',
                 'socketTextStream', 'initializeContextEnvironment', 'readTextFile', 'addSource',
                 'setNumberOfExecutionRetries', 'configure', 'executeAsync', 'registerJobListener',
-                'clearJobListeners', 'getJobListeners', "continuousSource"}
+                'clearJobListeners', 'getJobListeners', "fromSource"}
 
 
 if __name__ == '__main__':
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 7de2e97..59837ac 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1629,11 +1629,11 @@ public class StreamExecutionEnvironment {
 	 * @return the data stream constructed
 	 */
 	@Experimental
-	public <OUT> DataStreamSource<OUT> continuousSource(
+	public <OUT> DataStreamSource<OUT> fromSource(
 			Source<OUT, ?, ?> source,
 			WatermarkStrategy<OUT> timestampsAndWatermarks,
 			String sourceName) {
-		return continuousSource(source, timestampsAndWatermarks, sourceName, null);
+		return fromSource(source, timestampsAndWatermarks, sourceName, null);
 	}
 
 	/**
@@ -1650,7 +1650,7 @@ public class StreamExecutionEnvironment {
 	 * @return the data stream constructed
 	 */
 	@Experimental
-	public <OUT> DataStreamSource<OUT> continuousSource(
+	public <OUT> DataStreamSource<OUT> fromSource(
 			Source<OUT, ?, ?> source,
 			WatermarkStrategy<OUT> timestampsAndWatermarks,
 			String sourceName,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 42edc70..c947325 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -283,7 +283,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 	@Test
 	public void testOperatorCoordinatorAddedToJobVertex() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		DataStream<Integer> stream = env.continuousSource(
+		DataStream<Integer> stream = env.fromSource(
 				new MockSource(Boundedness.BOUNDED, 1),
 				WatermarkStrategy.noWatermarks(),
 				"TestingSource");
@@ -493,7 +493,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 	@Test
 	public void testCoordinatedOperator() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		DataStream<Integer> source = env.continuousSource(
+		DataStream<Integer> source = env.fromSource(
 				new MockSource(Boundedness.BOUNDED, 1),
 				WatermarkStrategy.noWatermarks(),
 				"TestSource");
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 925d571..9ab3acf 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -666,13 +666,13 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     * Create a DataStream using a [[Source]].
     */
   @Experimental
-  def continuousSource[T: TypeInformation](
+  def fromSource[T: TypeInformation](
       source: Source[T, _ <: SourceSplit, _],
       watermarkStrategy: WatermarkStrategy[T],
       sourceName: String): DataStream[T] = {
 
     val typeInfo = implicitly[TypeInformation[T]]
-    asScalaStream(javaEnv.continuousSource(source, watermarkStrategy, sourceName, typeInfo))
+    asScalaStream(javaEnv.fromSource(source, watermarkStrategy, sourceName, typeInfo))
   }
 
   /**
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala
index fa503e0..8765cb3 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala
@@ -40,7 +40,7 @@ class StreamExecutionEnvironmentTest {
     implicit val typeInfo: TypeInformation[Integer] = new MockTypeInfo()
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
-    val stream = env.continuousSource(
+    val stream = env.fromSource(
       new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 1),
       WatermarkStrategy.noWatermarks(),
       "test source")