You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/02/15 11:59:35 UTC
[flink] branch master updated: [FLINK-25906][streaming] check explicit env allowed on StreamExecutionEnvironment own.
This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new daff5df [FLINK-25906][streaming] check explicit env allowed on StreamExecutionEnvironment own.
daff5df is described below
commit daff5dfa8d9218b33f811611b3209508f2ae3bd7
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Wed Feb 9 09:57:43 2022 +0100
[FLINK-25906][streaming] check explicit env allowed on StreamExecutionEnvironment own.
---
.../tests/test_stream_execution_environment_completeness.py | 2 +-
.../streaming/api/environment/LocalStreamEnvironment.java | 3 +--
.../api/environment/StreamExecutionEnvironment.java | 13 +++++++++++++
.../api/scala/StreamingScalaAPICompletenessTest.scala | 12 +++++++-----
4 files changed, 22 insertions(+), 8 deletions(-)
diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
index cca1a6c..5df28f6 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
@@ -49,7 +49,7 @@ class StreamExecutionEnvironmentCompletenessTests(PythonAPICompletenessTestCase,
'socketTextStream', 'initializeContextEnvironment', 'readTextFile',
'setNumberOfExecutionRetries', 'executeAsync', 'registerJobListener',
'clearJobListeners', 'getJobListeners', 'fromSequence', 'getConfiguration',
- 'generateStreamGraph', 'getTransformations'}
+ 'generateStreamGraph', 'getTransformations', 'areExplicitEnvironmentsAllowed'}
if __name__ == '__main__':
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 057a6ee..b2a5646 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.environment;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -53,7 +52,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
}
private static Configuration validateAndGetConfiguration(final Configuration configuration) {
- if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
+ if (!areExplicitEnvironmentsAllowed()) {
throw new InvalidProgramException(
"The LocalStreamEnvironment cannot be used when submitting a program through a client, "
+ "or running in a TestEnvironment context.");
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index ea52c5b..54dca34 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -2480,6 +2480,19 @@ public class StreamExecutionEnvironment {
name, new DistributedCache.DistributedCacheEntry(filePath, executable)));
}
+ /**
+ * Checks whether it is currently permitted to explicitly instantiate a LocalEnvironment or a
+ * RemoteEnvironment.
+ *
+ * @return True, if it is possible to explicitly instantiate a LocalEnvironment or a
+ * RemoteEnvironment, false otherwise.
+ */
+ @Internal
+ public static boolean areExplicitEnvironmentsAllowed() {
+ return contextEnvironmentFactory == null
+ && threadLocalContextEnvironmentFactory.get() == null;
+ }
+
// Private helpers.
@SuppressWarnings("unchecked")
private <OUT, T extends TypeInformation<OUT>> T getTypeInfo(
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index ce56cf1..409fe9b 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -67,6 +67,8 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.isForceCheckpointing",
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.generateStreamGraph",
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTransformations",
+ "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment" +
+ ".areExplicitEnvironmentsAllowed",
// TypeHints are only needed for Java API, Scala API doesn't need them
@@ -113,8 +115,8 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
checkMethods(
"ConnectedStreams", "ConnectedStreams",
- classOf[org.apache.flink.streaming.api.datastream.ConnectedStreams[_,_]],
- classOf[ConnectedStreams[_,_]])
+ classOf[org.apache.flink.streaming.api.datastream.ConnectedStreams[_, _]],
+ classOf[ConnectedStreams[_, _]])
checkMethods(
"WindowedStream", "WindowedStream",
@@ -133,12 +135,12 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
checkMethods(
"JoinedStreams.WithWindow", "JoinedStreams.WithWindow",
- classOf[org.apache.flink.streaming.api.datastream.JoinedStreams.WithWindow[_,_,_,_]],
- classOf[JoinedStreams[_,_]#Where[_]#EqualTo#WithWindow[_]])
+ classOf[org.apache.flink.streaming.api.datastream.JoinedStreams.WithWindow[_, _, _, _]],
+ classOf[JoinedStreams[_, _]#Where[_]#EqualTo#WithWindow[_]])
checkMethods(
"CoGroupedStreams.WithWindow", "CoGroupedStreams.WithWindow",
- classOf[org.apache.flink.streaming.api.datastream.CoGroupedStreams.WithWindow[_,_,_,_]],
+ classOf[org.apache.flink.streaming.api.datastream.CoGroupedStreams.WithWindow[_, _, _, _]],
classOf[CoGroupedStreams[_, _]#Where[_]#EqualTo#WithWindow[_]])
}
}