You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/06/24 18:42:49 UTC

[flink] branch master updated (ca53401 -> 49b5103)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from ca53401  [FLINK-18426] Remove incompatible deprecated keys from ClusterOptions
     new 4776813  [FLINK-18429][DataStream API] Make CheckpointListener.notifyCheckpointAborted(checkpointId) a default method.
     new b689cea  [hotfix][DataStream API] Fix checkstyle issues and JavaDocs in CheckpointListener.
     new 95b9adb  [FLINK-18430][DataStream API] Classify CheckpointedFunction and CheckpointListener as @Public
     new 49b5103  [FLINK-18428][API/DataStream] Rename StreamExecutionEnvironment#continuousSource() to StreamExecutionEnvironment#source().

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/dev/stream/sources.md                           |  6 +++---
 docs/dev/stream/sources.zh.md                        |  6 +++---
 .../base/source/reader/CoordinatedSourceITCase.java  |  6 +++---
 ...test_stream_execution_environment_completeness.py |  2 +-
 .../flink/runtime/state/CheckpointListener.java      | 20 +++++++++++---------
 .../api/checkpoint/CheckpointedFunction.java         |  4 ++--
 .../api/environment/StreamExecutionEnvironment.java  |  6 +++---
 .../api/graph/StreamingJobGraphGeneratorTest.java    |  4 ++--
 .../api/scala/StreamExecutionEnvironment.scala       |  4 ++--
 .../api/scala/StreamExecutionEnvironmentTest.scala   |  2 +-
 10 files changed, 31 insertions(+), 29 deletions(-)


[flink] 03/04: [FLINK-18430][DataStream API] Classify CheckpointedFunction and CheckpointListener as @Public

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 95b9adbeaa7058c4fc804a5277cbaa958485d63b
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed Jun 24 18:07:22 2020 +0200

    [FLINK-18430][DataStream API] Classify CheckpointedFunction and CheckpointListener as @Public
    
    This closes #12767
---
 .../main/java/org/apache/flink/runtime/state/CheckpointListener.java  | 4 ++--
 .../apache/flink/streaming/api/checkpoint/CheckpointedFunction.java   | 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
index adc4baf..a32b597 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
@@ -18,14 +18,14 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Public;
 
 /**
  * This interface must be implemented by functions/operations that want to receive
  * a commit notification once a checkpoint has been completely acknowledged by all
  * participants.
  */
-@PublicEvolving
+@Public
 public interface CheckpointListener {
 
 	/**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
index 604e7f4..5aeeb34 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.api.checkpoint;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.OperatorStateStore;
@@ -140,7 +140,7 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext;
  * @see ListCheckpointed
  * @see RuntimeContext
  */
-@PublicEvolving
+@Public
 public interface CheckpointedFunction {
 
 	/**


[flink] 01/04: [FLINK-18429][DataStream API] Make CheckpointListener.notifyCheckpointAborted(checkpointId) a default method.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4776813cc335080dbe8684f51c3aa0f7f1d774d0
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed Jun 24 17:26:55 2020 +0200

    [FLINK-18429][DataStream API] Make CheckpointListener.notifyCheckpointAborted(checkpointId) a default method.
    
    This avoid breaking many user programs that use this interface.
    
    This closes #12767
---
 .../main/java/org/apache/flink/runtime/state/CheckpointListener.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
index 13c8e39..ddf2f2d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
@@ -45,5 +45,5 @@ public interface CheckpointListener {
 	 * @param checkpointId The ID of the checkpoint that has been aborted.
 	 * @throws Exception
 	 */
-	void notifyCheckpointAborted(long checkpointId) throws Exception;
+	default void notifyCheckpointAborted(long checkpointId) throws Exception {};
 }


[flink] 04/04: [FLINK-18428][API/DataStream] Rename StreamExecutionEnvironment#continuousSource() to StreamExecutionEnvironment#source().

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 49b5103299374641662d66b5165441b532206b71
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Wed Jun 24 21:22:18 2020 +0800

    [FLINK-18428][API/DataStream] Rename StreamExecutionEnvironment#continuousSource() to StreamExecutionEnvironment#source().
    
    This closes #12766
---
 docs/dev/stream/sources.md                                          | 6 +++---
 docs/dev/stream/sources.zh.md                                       | 6 +++---
 .../flink/connector/base/source/reader/CoordinatedSourceITCase.java | 6 +++---
 .../tests/test_stream_execution_environment_completeness.py         | 2 +-
 .../flink/streaming/api/environment/StreamExecutionEnvironment.java | 6 +++---
 .../flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java   | 4 ++--
 .../flink/streaming/api/scala/StreamExecutionEnvironment.scala      | 4 ++--
 .../flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala  | 2 +-
 8 files changed, 18 insertions(+), 18 deletions(-)

diff --git a/docs/dev/stream/sources.md b/docs/dev/stream/sources.md
index 669ca8f..3c3db90 100644
--- a/docs/dev/stream/sources.md
+++ b/docs/dev/stream/sources.md
@@ -187,7 +187,7 @@ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEn
 
 Source mySource = new MySource(...);
 
-DataStream<Integer> stream = env.continuousSource(
+DataStream<Integer> stream = env.fromSource(
         mySource,
         WatermarkStrategy.noWatermarks(),
         "MySourceName");
@@ -200,7 +200,7 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment()
 
 val mySource = new MySource(...)
 
-val stream = env.continuousSource(
+val stream = env.fromSource(
       mySource,
       WatermarkStrategy.noWatermarks(),
       "MySourceName")
@@ -352,7 +352,7 @@ Apparently, the `SourceReader` implementations can also implement their own thre
 The `WatermarkStrategy` is passed to the Source during creation in the DataStream API and creates both the [TimestampAssigner](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java) and [WatermarkGenerator](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGenerator.java).
 
 {% highlight java %}
-environment.continuousSource(
+environment.fromSource(
     Source<OUT, ?, ?> source,
     WatermarkStrategy<OUT> timestampsAndWatermarks,
     String sourceName)
diff --git a/docs/dev/stream/sources.zh.md b/docs/dev/stream/sources.zh.md
index 3f20388..a063ecb 100644
--- a/docs/dev/stream/sources.zh.md
+++ b/docs/dev/stream/sources.zh.md
@@ -187,7 +187,7 @@ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEn
 
 Source mySource = new MySource(...);
 
-DataStream<Integer> stream = env.continuousSource(
+DataStream<Integer> stream = env.fromSource(
         mySource,
         WatermarkStrategy.noWatermarks(),
         "MySourceName");
@@ -200,7 +200,7 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment()
 
 val mySource = new MySource(...)
 
-val stream = env.continuousSource(
+val stream = env.fromSource(
       mySource,
       WatermarkStrategy.noWatermarks(),
       "MySourceName")
@@ -352,7 +352,7 @@ Apparently, the `SourceReader` implementations can also implement their own thre
 The `WatermarkStrategy` is passed to the Source during creation in the DataStream API and creates both the [TimestampAssigner](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java) and [WatermarkGenerator](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGenerator.java).
 
 {% highlight java %}
-environment.continuousSource(
+environment.fromSource(
     Source<OUT, ?, ?> source,
     WatermarkStrategy<OUT> timestampsAndWatermarks,
     String sourceName)
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
index 6582210..3280c38 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
@@ -45,7 +45,7 @@ public class CoordinatedSourceITCase extends AbstractTestBase {
 	public void testEnumeratorReaderCommunication() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		MockBaseSource source = new MockBaseSource(2, 10, Boundedness.BOUNDED);
-		DataStream<Integer> stream = env.continuousSource(
+		DataStream<Integer> stream = env.fromSource(
 				source,
 				WatermarkStrategy.noWatermarks(),
 				"TestingSource");
@@ -57,11 +57,11 @@ public class CoordinatedSourceITCase extends AbstractTestBase {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		MockBaseSource source1 = new MockBaseSource(2, 10, Boundedness.BOUNDED);
 		MockBaseSource source2 = new MockBaseSource(2, 10, 20, Boundedness.BOUNDED);
-		DataStream<Integer> stream1 = env.continuousSource(
+		DataStream<Integer> stream1 = env.fromSource(
 				source1,
 				WatermarkStrategy.noWatermarks(),
 				"TestingSource1");
-		DataStream<Integer> stream2 = env.continuousSource(
+		DataStream<Integer> stream2 = env.fromSource(
 				source2,
 				WatermarkStrategy.noWatermarks(),
 				"TestingSource2");
diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
index c91e086..9764cb4 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
@@ -49,7 +49,7 @@ class StreamExecutionEnvironmentCompletenessTests(PythonAPICompletenessTestCase,
                 'createInput', 'createLocalEnvironmentWithWebUI', 'fromCollection',
                 'socketTextStream', 'initializeContextEnvironment', 'readTextFile', 'addSource',
                 'setNumberOfExecutionRetries', 'configure', 'executeAsync', 'registerJobListener',
-                'clearJobListeners', 'getJobListeners', "continuousSource"}
+                'clearJobListeners', 'getJobListeners', "fromSource"}
 
 
 if __name__ == '__main__':
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 7de2e97..59837ac 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1629,11 +1629,11 @@ public class StreamExecutionEnvironment {
 	 * @return the data stream constructed
 	 */
 	@Experimental
-	public <OUT> DataStreamSource<OUT> continuousSource(
+	public <OUT> DataStreamSource<OUT> fromSource(
 			Source<OUT, ?, ?> source,
 			WatermarkStrategy<OUT> timestampsAndWatermarks,
 			String sourceName) {
-		return continuousSource(source, timestampsAndWatermarks, sourceName, null);
+		return fromSource(source, timestampsAndWatermarks, sourceName, null);
 	}
 
 	/**
@@ -1650,7 +1650,7 @@ public class StreamExecutionEnvironment {
 	 * @return the data stream constructed
 	 */
 	@Experimental
-	public <OUT> DataStreamSource<OUT> continuousSource(
+	public <OUT> DataStreamSource<OUT> fromSource(
 			Source<OUT, ?, ?> source,
 			WatermarkStrategy<OUT> timestampsAndWatermarks,
 			String sourceName,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 42edc70..c947325 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -283,7 +283,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 	@Test
 	public void testOperatorCoordinatorAddedToJobVertex() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		DataStream<Integer> stream = env.continuousSource(
+		DataStream<Integer> stream = env.fromSource(
 				new MockSource(Boundedness.BOUNDED, 1),
 				WatermarkStrategy.noWatermarks(),
 				"TestingSource");
@@ -493,7 +493,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 	@Test
 	public void testCoordinatedOperator() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		DataStream<Integer> source = env.continuousSource(
+		DataStream<Integer> source = env.fromSource(
 				new MockSource(Boundedness.BOUNDED, 1),
 				WatermarkStrategy.noWatermarks(),
 				"TestSource");
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 925d571..9ab3acf 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -666,13 +666,13 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     * Create a DataStream using a [[Source]].
     */
   @Experimental
-  def continuousSource[T: TypeInformation](
+  def fromSource[T: TypeInformation](
       source: Source[T, _ <: SourceSplit, _],
       watermarkStrategy: WatermarkStrategy[T],
       sourceName: String): DataStream[T] = {
 
     val typeInfo = implicitly[TypeInformation[T]]
-    asScalaStream(javaEnv.continuousSource(source, watermarkStrategy, sourceName, typeInfo))
+    asScalaStream(javaEnv.fromSource(source, watermarkStrategy, sourceName, typeInfo))
   }
 
   /**
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala
index fa503e0..8765cb3 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala
@@ -40,7 +40,7 @@ class StreamExecutionEnvironmentTest {
     implicit val typeInfo: TypeInformation[Integer] = new MockTypeInfo()
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
-    val stream = env.continuousSource(
+    val stream = env.fromSource(
       new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 1),
       WatermarkStrategy.noWatermarks(),
       "test source")


[flink] 02/04: [hotfix][DataStream API] Fix checkstyle issues and JavaDocs in CheckpointListener.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b689cea3c3bcada76dec316ae41054a4a798e4e5
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed Jun 24 17:51:22 2020 +0200

    [hotfix][DataStream API] Fix checkstyle issues and JavaDocs in CheckpointListener.
---
 .../apache/flink/runtime/state/CheckpointListener.java   | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
index ddf2f2d..adc4baf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
@@ -15,8 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.runtime.state;
 
+package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.PublicEvolving;
 
@@ -30,12 +30,13 @@ public interface CheckpointListener {
 
 	/**
 	 * This method is called as a notification once a distributed checkpoint has been completed.
-	 * 
-	 * Note that any exception during this method will not cause the checkpoint to
+	 *
+	 * <p>Note that any exception during this method will not cause the checkpoint to
 	 * fail any more.
-	 * 
+	 *
 	 * @param checkpointId The ID of the checkpoint that has been completed.
-	 * @throws Exception
+	 * @throws Exception This method can propagate exceptions, which leads to a failure/recovery for
+	 *                   the task. Not that this will NOT lead to the checkpoint being revoked.
 	 */
 	void notifyCheckpointComplete(long checkpointId) throws Exception;
 
@@ -43,7 +44,8 @@ public interface CheckpointListener {
 	 * This method is called as a notification once a distributed checkpoint has been aborted.
 	 *
 	 * @param checkpointId The ID of the checkpoint that has been aborted.
-	 * @throws Exception
+	 * @throws Exception This method can propagate exceptions, which leads to a failure/recovery for
+	 *                   the task.
 	 */
-	default void notifyCheckpointAborted(long checkpointId) throws Exception {};
+	default void notifyCheckpointAborted(long checkpointId) throws Exception {}
 }