You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/02/15 11:59:35 UTC

[flink] branch master updated: [FLINK-25906][streaming] check explicit env allowed on StreamExecutionEnvironment own.

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new daff5df  [FLINK-25906][streaming] check explicit env allowed on StreamExecutionEnvironment own.
daff5df is described below

commit daff5dfa8d9218b33f811611b3209508f2ae3bd7
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Wed Feb 9 09:57:43 2022 +0100

    [FLINK-25906][streaming] check explicit env allowed on StreamExecutionEnvironment own.
---
 .../tests/test_stream_execution_environment_completeness.py |  2 +-
 .../streaming/api/environment/LocalStreamEnvironment.java   |  3 +--
 .../api/environment/StreamExecutionEnvironment.java         | 13 +++++++++++++
 .../api/scala/StreamingScalaAPICompletenessTest.scala       | 12 +++++++-----
 4 files changed, 22 insertions(+), 8 deletions(-)

diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
index cca1a6c..5df28f6 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
@@ -49,7 +49,7 @@ class StreamExecutionEnvironmentCompletenessTests(PythonAPICompletenessTestCase,
                 'socketTextStream', 'initializeContextEnvironment', 'readTextFile',
                 'setNumberOfExecutionRetries', 'executeAsync', 'registerJobListener',
                 'clearJobListeners', 'getJobListeners', 'fromSequence', 'getConfiguration',
-                'generateStreamGraph', 'getTransformations'}
+                'generateStreamGraph', 'getTransformations', 'areExplicitEnvironmentsAllowed'}
 
 
 if __name__ == '__main__':
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 057a6ee..b2a5646 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.environment;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -53,7 +52,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
     }
 
     private static Configuration validateAndGetConfiguration(final Configuration configuration) {
-        if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
+        if (!areExplicitEnvironmentsAllowed()) {
             throw new InvalidProgramException(
                     "The LocalStreamEnvironment cannot be used when submitting a program through a client, "
                             + "or running in a TestEnvironment context.");
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index ea52c5b..54dca34 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -2480,6 +2480,19 @@ public class StreamExecutionEnvironment {
                         name, new DistributedCache.DistributedCacheEntry(filePath, executable)));
     }
 
+    /**
+     * Checks whether it is currently permitted to explicitly instantiate a LocalEnvironment or a
+     * RemoteEnvironment.
+     *
+     * @return True, if it is possible to explicitly instantiate a LocalEnvironment or a
+     *     RemoteEnvironment, false otherwise.
+     */
+    @Internal
+    public static boolean areExplicitEnvironmentsAllowed() {
+        return contextEnvironmentFactory == null
+                && threadLocalContextEnvironmentFactory.get() == null;
+    }
+
     // Private helpers.
     @SuppressWarnings("unchecked")
     private <OUT, T extends TypeInformation<OUT>> T getTypeInfo(
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index ce56cf1..409fe9b 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -67,6 +67,8 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
       "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.isForceCheckpointing",
       "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.generateStreamGraph",
       "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTransformations",
+      "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment" +
+        ".areExplicitEnvironmentsAllowed",
 
 
       // TypeHints are only needed for Java API, Scala API doesn't need them
@@ -113,8 +115,8 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
 
     checkMethods(
       "ConnectedStreams", "ConnectedStreams",
-      classOf[org.apache.flink.streaming.api.datastream.ConnectedStreams[_,_]],
-      classOf[ConnectedStreams[_,_]])
+      classOf[org.apache.flink.streaming.api.datastream.ConnectedStreams[_, _]],
+      classOf[ConnectedStreams[_, _]])
 
     checkMethods(
       "WindowedStream", "WindowedStream",
@@ -133,12 +135,12 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
 
     checkMethods(
       "JoinedStreams.WithWindow", "JoinedStreams.WithWindow",
-      classOf[org.apache.flink.streaming.api.datastream.JoinedStreams.WithWindow[_,_,_,_]],
-      classOf[JoinedStreams[_,_]#Where[_]#EqualTo#WithWindow[_]])
+      classOf[org.apache.flink.streaming.api.datastream.JoinedStreams.WithWindow[_, _, _, _]],
+      classOf[JoinedStreams[_, _]#Where[_]#EqualTo#WithWindow[_]])
 
     checkMethods(
       "CoGroupedStreams.WithWindow", "CoGroupedStreams.WithWindow",
-      classOf[org.apache.flink.streaming.api.datastream.CoGroupedStreams.WithWindow[_,_,_,_]],
+      classOf[org.apache.flink.streaming.api.datastream.CoGroupedStreams.WithWindow[_, _, _, _]],
       classOf[CoGroupedStreams[_, _]#Where[_]#EqualTo#WithWindow[_]])
   }
 }