You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/06/24 18:45:39 UTC

[flink] branch release-1.11 updated (62c7265 -> 369d4a6)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 62c7265  [FLINK-18426] Remove incompatible deprecated keys from ClusterOptions
     new a60a4a9  [FLINK-18429][DataStream API] Make CheckpointListener.notifyCheckpointAborted(checkpointId) a default method.
     new 7abf7ba  [hotfix][DataStream API] Fix checkstyle issues and JavaDocs in CheckpointListener.
     new 25cad5f  [FLINK-18430][DataStream API] Classify CheckpointedFunction and CheckpointListener as @Public
     new 369d4a6  [FLINK-18428][API/DataStream] Rename StreamExecutionEnvironment#continuousSource() to StreamExecutionEnvironment#source().

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/dev/stream/sources.md                           |  6 +++---
 docs/dev/stream/sources.zh.md                        |  6 +++---
 .../base/source/reader/CoordinatedSourceITCase.java  |  6 +++---
 ...test_stream_execution_environment_completeness.py |  2 +-
 .../flink/runtime/state/CheckpointListener.java      | 20 +++++++++++---------
 .../api/checkpoint/CheckpointedFunction.java         |  4 ++--
 .../api/environment/StreamExecutionEnvironment.java  |  6 +++---
 .../api/graph/StreamingJobGraphGeneratorTest.java    |  4 ++--
 .../api/scala/StreamExecutionEnvironment.scala       |  4 ++--
 .../api/scala/StreamExecutionEnvironmentTest.scala   |  2 +-
 10 files changed, 31 insertions(+), 29 deletions(-)


[flink] 03/04: [FLINK-18430][DataStream API] Classify CheckpointedFunction and CheckpointListener as @Public

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 25cad5fb517bc8a6a6e2a37c78e43c474a99bbd9
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed Jun 24 18:07:22 2020 +0200

    [FLINK-18430][DataStream API] Classify CheckpointedFunction and CheckpointListener as @Public
---
 .../main/java/org/apache/flink/runtime/state/CheckpointListener.java  | 4 ++--
 .../apache/flink/streaming/api/checkpoint/CheckpointedFunction.java   | 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
index adc4baf..a32b597 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
@@ -18,14 +18,14 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Public;
 
 /**
  * This interface must be implemented by functions/operations that want to receive
  * a commit notification once a checkpoint has been completely acknowledged by all
  * participants.
  */
-@PublicEvolving
+@Public
 public interface CheckpointListener {
 
 	/**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
index 604e7f4..5aeeb34 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.api.checkpoint;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.OperatorStateStore;
@@ -140,7 +140,7 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext;
  * @see ListCheckpointed
  * @see RuntimeContext
  */
-@PublicEvolving
+@Public
 public interface CheckpointedFunction {
 
 	/**


[flink] 01/04: [FLINK-18429][DataStream API] Make CheckpointListener.notifyCheckpointAborted(checkpointId) a default method.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a60a4a9c71a2648176ca46a25230920674130d01
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed Jun 24 17:26:55 2020 +0200

    [FLINK-18429][DataStream API] Make CheckpointListener.notifyCheckpointAborted(checkpointId) a default method.
    
    This avoid breaking many user programs that use this interface.
---
 .../main/java/org/apache/flink/runtime/state/CheckpointListener.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
index 13c8e39..ddf2f2d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
@@ -45,5 +45,5 @@ public interface CheckpointListener {
 	 * @param checkpointId The ID of the checkpoint that has been aborted.
 	 * @throws Exception
 	 */
-	void notifyCheckpointAborted(long checkpointId) throws Exception;
+	default void notifyCheckpointAborted(long checkpointId) throws Exception {};
 }


[flink] 02/04: [hotfix][DataStream API] Fix checkstyle issues and JavaDocs in CheckpointListener.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7abf7ba1a52eefd2ec2213f2ffb9f0e24bdf4b82
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed Jun 24 17:51:22 2020 +0200

    [hotfix][DataStream API] Fix checkstyle issues and JavaDocs in CheckpointListener.
---
 .../apache/flink/runtime/state/CheckpointListener.java   | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
index ddf2f2d..adc4baf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
@@ -15,8 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.runtime.state;
 
+package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.PublicEvolving;
 
@@ -30,12 +30,13 @@ public interface CheckpointListener {
 
 	/**
 	 * This method is called as a notification once a distributed checkpoint has been completed.
-	 * 
-	 * Note that any exception during this method will not cause the checkpoint to
+	 *
+	 * <p>Note that any exception during this method will not cause the checkpoint to
 	 * fail any more.
-	 * 
+	 *
 	 * @param checkpointId The ID of the checkpoint that has been completed.
-	 * @throws Exception
+	 * @throws Exception This method can propagate exceptions, which leads to a failure/recovery for
+	 *                   the task. Not that this will NOT lead to the checkpoint being revoked.
 	 */
 	void notifyCheckpointComplete(long checkpointId) throws Exception;
 
@@ -43,7 +44,8 @@ public interface CheckpointListener {
 	 * This method is called as a notification once a distributed checkpoint has been aborted.
 	 *
 	 * @param checkpointId The ID of the checkpoint that has been aborted.
-	 * @throws Exception
+	 * @throws Exception This method can propagate exceptions, which leads to a failure/recovery for
+	 *                   the task.
 	 */
-	default void notifyCheckpointAborted(long checkpointId) throws Exception {};
+	default void notifyCheckpointAborted(long checkpointId) throws Exception {}
 }


[flink] 04/04: [FLINK-18428][API/DataStream] Rename StreamExecutionEnvironment#continuousSource() to StreamExecutionEnvironment#source().

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 369d4a6ce46dddfd2a14ad66ac2e189bd4827158
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Wed Jun 24 21:22:18 2020 +0800

    [FLINK-18428][API/DataStream] Rename StreamExecutionEnvironment#continuousSource() to StreamExecutionEnvironment#source().
    
    This closes #12766
---
 docs/dev/stream/sources.md                                          | 6 +++---
 docs/dev/stream/sources.zh.md                                       | 6 +++---
 .../flink/connector/base/source/reader/CoordinatedSourceITCase.java | 6 +++---
 .../tests/test_stream_execution_environment_completeness.py         | 2 +-
 .../flink/streaming/api/environment/StreamExecutionEnvironment.java | 6 +++---
 .../flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java   | 4 ++--
 .../flink/streaming/api/scala/StreamExecutionEnvironment.scala      | 4 ++--
 .../flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala  | 2 +-
 8 files changed, 18 insertions(+), 18 deletions(-)

diff --git a/docs/dev/stream/sources.md b/docs/dev/stream/sources.md
index 669ca8f..3c3db90 100644
--- a/docs/dev/stream/sources.md
+++ b/docs/dev/stream/sources.md
@@ -187,7 +187,7 @@ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEn
 
 Source mySource = new MySource(...);
 
-DataStream<Integer> stream = env.continuousSource(
+DataStream<Integer> stream = env.fromSource(
         mySource,
         WatermarkStrategy.noWatermarks(),
         "MySourceName");
@@ -200,7 +200,7 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment()
 
 val mySource = new MySource(...)
 
-val stream = env.continuousSource(
+val stream = env.fromSource(
       mySource,
       WatermarkStrategy.noWatermarks(),
       "MySourceName")
@@ -352,7 +352,7 @@ Apparently, the `SourceReader` implementations can also implement their own thre
 The `WatermarkStrategy` is passed to the Source during creation in the DataStream API and creates both the [TimestampAssigner](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java) and [WatermarkGenerator](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGenerator.java).
 
 {% highlight java %}
-environment.continuousSource(
+environment.fromSource(
     Source<OUT, ?, ?> source,
     WatermarkStrategy<OUT> timestampsAndWatermarks,
     String sourceName)
diff --git a/docs/dev/stream/sources.zh.md b/docs/dev/stream/sources.zh.md
index 3f20388..a063ecb 100644
--- a/docs/dev/stream/sources.zh.md
+++ b/docs/dev/stream/sources.zh.md
@@ -187,7 +187,7 @@ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEn
 
 Source mySource = new MySource(...);
 
-DataStream<Integer> stream = env.continuousSource(
+DataStream<Integer> stream = env.fromSource(
         mySource,
         WatermarkStrategy.noWatermarks(),
         "MySourceName");
@@ -200,7 +200,7 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment()
 
 val mySource = new MySource(...)
 
-val stream = env.continuousSource(
+val stream = env.fromSource(
       mySource,
       WatermarkStrategy.noWatermarks(),
       "MySourceName")
@@ -352,7 +352,7 @@ Apparently, the `SourceReader` implementations can also implement their own thre
 The `WatermarkStrategy` is passed to the Source during creation in the DataStream API and creates both the [TimestampAssigner](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java) and [WatermarkGenerator](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGenerator.java).
 
 {% highlight java %}
-environment.continuousSource(
+environment.fromSource(
     Source<OUT, ?, ?> source,
     WatermarkStrategy<OUT> timestampsAndWatermarks,
     String sourceName)
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
index 6582210..3280c38 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
@@ -45,7 +45,7 @@ public class CoordinatedSourceITCase extends AbstractTestBase {
 	public void testEnumeratorReaderCommunication() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		MockBaseSource source = new MockBaseSource(2, 10, Boundedness.BOUNDED);
-		DataStream<Integer> stream = env.continuousSource(
+		DataStream<Integer> stream = env.fromSource(
 				source,
 				WatermarkStrategy.noWatermarks(),
 				"TestingSource");
@@ -57,11 +57,11 @@ public class CoordinatedSourceITCase extends AbstractTestBase {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		MockBaseSource source1 = new MockBaseSource(2, 10, Boundedness.BOUNDED);
 		MockBaseSource source2 = new MockBaseSource(2, 10, 20, Boundedness.BOUNDED);
-		DataStream<Integer> stream1 = env.continuousSource(
+		DataStream<Integer> stream1 = env.fromSource(
 				source1,
 				WatermarkStrategy.noWatermarks(),
 				"TestingSource1");
-		DataStream<Integer> stream2 = env.continuousSource(
+		DataStream<Integer> stream2 = env.fromSource(
 				source2,
 				WatermarkStrategy.noWatermarks(),
 				"TestingSource2");
diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
index c91e086..9764cb4 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
@@ -49,7 +49,7 @@ class StreamExecutionEnvironmentCompletenessTests(PythonAPICompletenessTestCase,
                 'createInput', 'createLocalEnvironmentWithWebUI', 'fromCollection',
                 'socketTextStream', 'initializeContextEnvironment', 'readTextFile', 'addSource',
                 'setNumberOfExecutionRetries', 'configure', 'executeAsync', 'registerJobListener',
-                'clearJobListeners', 'getJobListeners', "continuousSource"}
+                'clearJobListeners', 'getJobListeners', "fromSource"}
 
 
 if __name__ == '__main__':
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 7de2e97..59837ac 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1629,11 +1629,11 @@ public class StreamExecutionEnvironment {
 	 * @return the data stream constructed
 	 */
 	@Experimental
-	public <OUT> DataStreamSource<OUT> continuousSource(
+	public <OUT> DataStreamSource<OUT> fromSource(
 			Source<OUT, ?, ?> source,
 			WatermarkStrategy<OUT> timestampsAndWatermarks,
 			String sourceName) {
-		return continuousSource(source, timestampsAndWatermarks, sourceName, null);
+		return fromSource(source, timestampsAndWatermarks, sourceName, null);
 	}
 
 	/**
@@ -1650,7 +1650,7 @@ public class StreamExecutionEnvironment {
 	 * @return the data stream constructed
 	 */
 	@Experimental
-	public <OUT> DataStreamSource<OUT> continuousSource(
+	public <OUT> DataStreamSource<OUT> fromSource(
 			Source<OUT, ?, ?> source,
 			WatermarkStrategy<OUT> timestampsAndWatermarks,
 			String sourceName,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 42edc70..c947325 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -283,7 +283,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 	@Test
 	public void testOperatorCoordinatorAddedToJobVertex() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		DataStream<Integer> stream = env.continuousSource(
+		DataStream<Integer> stream = env.fromSource(
 				new MockSource(Boundedness.BOUNDED, 1),
 				WatermarkStrategy.noWatermarks(),
 				"TestingSource");
@@ -493,7 +493,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 	@Test
 	public void testCoordinatedOperator() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		DataStream<Integer> source = env.continuousSource(
+		DataStream<Integer> source = env.fromSource(
 				new MockSource(Boundedness.BOUNDED, 1),
 				WatermarkStrategy.noWatermarks(),
 				"TestSource");
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 925d571..9ab3acf 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -666,13 +666,13 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     * Create a DataStream using a [[Source]].
     */
   @Experimental
-  def continuousSource[T: TypeInformation](
+  def fromSource[T: TypeInformation](
       source: Source[T, _ <: SourceSplit, _],
       watermarkStrategy: WatermarkStrategy[T],
       sourceName: String): DataStream[T] = {
 
     val typeInfo = implicitly[TypeInformation[T]]
-    asScalaStream(javaEnv.continuousSource(source, watermarkStrategy, sourceName, typeInfo))
+    asScalaStream(javaEnv.fromSource(source, watermarkStrategy, sourceName, typeInfo))
   }
 
   /**
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala
index fa503e0..8765cb3 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala
@@ -40,7 +40,7 @@ class StreamExecutionEnvironmentTest {
     implicit val typeInfo: TypeInformation[Integer] = new MockTypeInfo()
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
-    val stream = env.continuousSource(
+    val stream = env.fromSource(
       new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 1),
       WatermarkStrategy.noWatermarks(),
       "test source")