You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/06/24 18:42:53 UTC
[flink] 04/04: [FLINK-18428][API/DataStream] Rename
StreamExecutionEnvironment#continuousSource() to
StreamExecutionEnvironment#source().
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 49b5103299374641662d66b5165441b532206b71
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Wed Jun 24 21:22:18 2020 +0800
[FLINK-18428][API/DataStream] Rename StreamExecutionEnvironment#continuousSource() to StreamExecutionEnvironment#source().
This closes #12766
---
docs/dev/stream/sources.md | 6 +++---
docs/dev/stream/sources.zh.md | 6 +++---
.../flink/connector/base/source/reader/CoordinatedSourceITCase.java | 6 +++---
.../tests/test_stream_execution_environment_completeness.py | 2 +-
.../flink/streaming/api/environment/StreamExecutionEnvironment.java | 6 +++---
.../flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java | 4 ++--
.../flink/streaming/api/scala/StreamExecutionEnvironment.scala | 4 ++--
.../flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala | 2 +-
8 files changed, 18 insertions(+), 18 deletions(-)
diff --git a/docs/dev/stream/sources.md b/docs/dev/stream/sources.md
index 669ca8f..3c3db90 100644
--- a/docs/dev/stream/sources.md
+++ b/docs/dev/stream/sources.md
@@ -187,7 +187,7 @@ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEn
Source mySource = new MySource(...);
-DataStream<Integer> stream = env.continuousSource(
+DataStream<Integer> stream = env.fromSource(
mySource,
WatermarkStrategy.noWatermarks(),
"MySourceName");
@@ -200,7 +200,7 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment()
val mySource = new MySource(...)
-val stream = env.continuousSource(
+val stream = env.fromSource(
mySource,
WatermarkStrategy.noWatermarks(),
"MySourceName")
@@ -352,7 +352,7 @@ Apparently, the `SourceReader` implementations can also implement their own thre
The `WatermarkStrategy` is passed to the Source during creation in the DataStream API and creates both the [TimestampAssigner](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java) and [WatermarkGenerator](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGenerator.java).
{% highlight java %}
-environment.continuousSource(
+environment.fromSource(
Source<OUT, ?, ?> source,
WatermarkStrategy<OUT> timestampsAndWatermarks,
String sourceName)
diff --git a/docs/dev/stream/sources.zh.md b/docs/dev/stream/sources.zh.md
index 3f20388..a063ecb 100644
--- a/docs/dev/stream/sources.zh.md
+++ b/docs/dev/stream/sources.zh.md
@@ -187,7 +187,7 @@ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEn
Source mySource = new MySource(...);
-DataStream<Integer> stream = env.continuousSource(
+DataStream<Integer> stream = env.fromSource(
mySource,
WatermarkStrategy.noWatermarks(),
"MySourceName");
@@ -200,7 +200,7 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment()
val mySource = new MySource(...)
-val stream = env.continuousSource(
+val stream = env.fromSource(
mySource,
WatermarkStrategy.noWatermarks(),
"MySourceName")
@@ -352,7 +352,7 @@ Apparently, the `SourceReader` implementations can also implement their own thre
The `WatermarkStrategy` is passed to the Source during creation in the DataStream API and creates both the [TimestampAssigner](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java) and [WatermarkGenerator](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGenerator.java).
{% highlight java %}
-environment.continuousSource(
+environment.fromSource(
Source<OUT, ?, ?> source,
WatermarkStrategy<OUT> timestampsAndWatermarks,
String sourceName)
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
index 6582210..3280c38 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
@@ -45,7 +45,7 @@ public class CoordinatedSourceITCase extends AbstractTestBase {
public void testEnumeratorReaderCommunication() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
MockBaseSource source = new MockBaseSource(2, 10, Boundedness.BOUNDED);
- DataStream<Integer> stream = env.continuousSource(
+ DataStream<Integer> stream = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"TestingSource");
@@ -57,11 +57,11 @@ public class CoordinatedSourceITCase extends AbstractTestBase {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
MockBaseSource source1 = new MockBaseSource(2, 10, Boundedness.BOUNDED);
MockBaseSource source2 = new MockBaseSource(2, 10, 20, Boundedness.BOUNDED);
- DataStream<Integer> stream1 = env.continuousSource(
+ DataStream<Integer> stream1 = env.fromSource(
source1,
WatermarkStrategy.noWatermarks(),
"TestingSource1");
- DataStream<Integer> stream2 = env.continuousSource(
+ DataStream<Integer> stream2 = env.fromSource(
source2,
WatermarkStrategy.noWatermarks(),
"TestingSource2");
diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
index c91e086..9764cb4 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
@@ -49,7 +49,7 @@ class StreamExecutionEnvironmentCompletenessTests(PythonAPICompletenessTestCase,
'createInput', 'createLocalEnvironmentWithWebUI', 'fromCollection',
'socketTextStream', 'initializeContextEnvironment', 'readTextFile', 'addSource',
'setNumberOfExecutionRetries', 'configure', 'executeAsync', 'registerJobListener',
- 'clearJobListeners', 'getJobListeners', "continuousSource"}
+ 'clearJobListeners', 'getJobListeners', "fromSource"}
if __name__ == '__main__':
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 7de2e97..59837ac 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1629,11 +1629,11 @@ public class StreamExecutionEnvironment {
* @return the data stream constructed
*/
@Experimental
- public <OUT> DataStreamSource<OUT> continuousSource(
+ public <OUT> DataStreamSource<OUT> fromSource(
Source<OUT, ?, ?> source,
WatermarkStrategy<OUT> timestampsAndWatermarks,
String sourceName) {
- return continuousSource(source, timestampsAndWatermarks, sourceName, null);
+ return fromSource(source, timestampsAndWatermarks, sourceName, null);
}
/**
@@ -1650,7 +1650,7 @@ public class StreamExecutionEnvironment {
* @return the data stream constructed
*/
@Experimental
- public <OUT> DataStreamSource<OUT> continuousSource(
+ public <OUT> DataStreamSource<OUT> fromSource(
Source<OUT, ?, ?> source,
WatermarkStrategy<OUT> timestampsAndWatermarks,
String sourceName,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 42edc70..c947325 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -283,7 +283,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
@Test
public void testOperatorCoordinatorAddedToJobVertex() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream<Integer> stream = env.continuousSource(
+ DataStream<Integer> stream = env.fromSource(
new MockSource(Boundedness.BOUNDED, 1),
WatermarkStrategy.noWatermarks(),
"TestingSource");
@@ -493,7 +493,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
@Test
public void testCoordinatedOperator() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream<Integer> source = env.continuousSource(
+ DataStream<Integer> source = env.fromSource(
new MockSource(Boundedness.BOUNDED, 1),
WatermarkStrategy.noWatermarks(),
"TestSource");
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 925d571..9ab3acf 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -666,13 +666,13 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* Create a DataStream using a [[Source]].
*/
@Experimental
- def continuousSource[T: TypeInformation](
+ def fromSource[T: TypeInformation](
source: Source[T, _ <: SourceSplit, _],
watermarkStrategy: WatermarkStrategy[T],
sourceName: String): DataStream[T] = {
val typeInfo = implicitly[TypeInformation[T]]
- asScalaStream(javaEnv.continuousSource(source, watermarkStrategy, sourceName, typeInfo))
+ asScalaStream(javaEnv.fromSource(source, watermarkStrategy, sourceName, typeInfo))
}
/**
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala
index fa503e0..8765cb3 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala
@@ -40,7 +40,7 @@ class StreamExecutionEnvironmentTest {
implicit val typeInfo: TypeInformation[Integer] = new MockTypeInfo()
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val stream = env.continuousSource(
+ val stream = env.fromSource(
new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 1),
WatermarkStrategy.noWatermarks(),
"test source")