You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/10 10:07:16 UTC

[GitHub] [flink] deadwind4 opened a new pull request, #19685: [FLINK-27297][python] Add get_execution_environment method with configuration argument.

deadwind4 opened a new pull request, #19685:
URL: https://github.com/apache/flink/pull/19685

   ## What is the purpose of the change
   
   Add the StreamExecutionEnvironment#getExecutionEnvironment(Configuration) method in PyFlink
   
   
   ## Brief change log
   
     - *Add a configuration argument in `get_execution_environment` method*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added test that validates that get_execution_environment pass configuration is effective*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (PyDocs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dianfu commented on a diff in pull request #19685: [FLINK-27297][python] Add get_execution_environment method with configuration argument.

Posted by GitBox <gi...@apache.org>.
dianfu commented on code in PR #19685:
URL: https://github.com/apache/flink/pull/19685#discussion_r870878831


##########
flink-python/pyflink/table/tests/test_table_environment_api.py:
##########
@@ -237,12 +237,12 @@ def setUp(self) -> None:
         from pyflink.datastream import StreamExecutionEnvironment
 
         super(DataStreamConversionTestCases, self).setUp()
-        self.env = StreamExecutionEnvironment.get_execution_environment()
+        config = Configuration()
+        config.setString("akka.ask.timeout", "20 s")

Review Comment:
   ```suggestion
           config.set_string("akka.ask.timeout", "20 s")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dianfu closed pull request #19685: [FLINK-27297][python] Add get_execution_environment method with configuration argument.

Posted by GitBox <gi...@apache.org>.
dianfu closed pull request #19685: [FLINK-27297][python] Add get_execution_environment method with configuration argument.
URL: https://github.com/apache/flink/pull/19685


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] deadwind4 commented on a diff in pull request #19685: [FLINK-27297][python] Add get_execution_environment method with configuration argument.

Posted by GitBox <gi...@apache.org>.
deadwind4 commented on code in PR #19685:
URL: https://github.com/apache/flink/pull/19685#discussion_r869946982


##########
flink-python/pyflink/datastream/stream_execution_environment.py:
##########
@@ -807,17 +807,30 @@ def register_cached_file(self, file_path: str, name: str, executable: bool = Fal
         self._j_stream_execution_environment.registerCachedFile(file_path, name, executable)
 
     @staticmethod
-    def get_execution_environment() -> 'StreamExecutionEnvironment':
+    def get_execution_environment(configuration: Configuration = None) \
+            -> 'StreamExecutionEnvironment':
         """
         Creates an execution environment that represents the context in which the
         program is currently executed. If the program is invoked standalone, this
         method returns a local execution environment.
 
+        When executed from the command line the given configuration is stacked on top of the
+        global configuration which comes from the flink-conf.yaml, potentially overriding
+        duplicated options.
+
+        :param configuration: The configuration to instantiate the environment with.
         :return: The execution environment of the context in which the program is executed.
         """
         gateway = get_gateway()
-        j_stream_exection_environment = gateway.jvm.org.apache.flink.streaming.api.environment\
-            .StreamExecutionEnvironment.getExecutionEnvironment()
+        JStreamExecutionEnvironment = gateway.jvm.org.apache.flink.streaming.api.environment \
+            .StreamExecutionEnvironment
+
+        if configuration:
+            j_stream_exection_environment = JStreamExecutionEnvironment.getExecutionEnvironment()

Review Comment:
   I find the only one [test_table_environment_api.py](https://github.com/apache/flink/pull/19685/files#diff-51b6ef54d67d0bdb98ffaac4535b82acdce2c0e591164a7018350aeda663ecb3) can be updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dianfu commented on a diff in pull request #19685: [FLINK-27297][python] Add get_execution_environment method with configuration argument.

Posted by GitBox <gi...@apache.org>.
dianfu commented on code in PR #19685:
URL: https://github.com/apache/flink/pull/19685#discussion_r869146129


##########
flink-python/pyflink/datastream/stream_execution_environment.py:
##########
@@ -807,17 +807,30 @@ def register_cached_file(self, file_path: str, name: str, executable: bool = Fal
         self._j_stream_execution_environment.registerCachedFile(file_path, name, executable)
 
     @staticmethod
-    def get_execution_environment() -> 'StreamExecutionEnvironment':
+    def get_execution_environment(configuration: Configuration = None) \
+            -> 'StreamExecutionEnvironment':
         """
         Creates an execution environment that represents the context in which the
         program is currently executed. If the program is invoked standalone, this
         method returns a local execution environment.
 
+        When executed from the command line the given configuration is stacked on top of the
+        global configuration which comes from the flink-conf.yaml, potentially overriding
+        duplicated options.
+
+        :param configuration: The configuration to instantiate the environment with.
         :return: The execution environment of the context in which the program is executed.
         """
         gateway = get_gateway()
-        j_stream_exection_environment = gateway.jvm.org.apache.flink.streaming.api.environment\
-            .StreamExecutionEnvironment.getExecutionEnvironment()
+        JStreamExecutionEnvironment = gateway.jvm.org.apache.flink.streaming.api.environment \
+            .StreamExecutionEnvironment
+
+        if configuration:
+            j_stream_exection_environment = JStreamExecutionEnvironment.getExecutionEnvironment()

Review Comment:
   ```suggestion
               j_stream_exection_environment = JStreamExecutionEnvironment.getExecutionEnvironment(configuration._j_configuration)
   ```
   I guess you should reverse the logic under if/else



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] deadwind4 commented on a diff in pull request #19685: [FLINK-27297][python] Add get_execution_environment method with configuration argument.

Posted by GitBox <gi...@apache.org>.
deadwind4 commented on code in PR #19685:
URL: https://github.com/apache/flink/pull/19685#discussion_r870886078


##########
flink-python/pyflink/table/tests/test_table_environment_api.py:
##########
@@ -237,12 +237,12 @@ def setUp(self) -> None:
         from pyflink.datastream import StreamExecutionEnvironment
 
         super(DataStreamConversionTestCases, self).setUp()
-        self.env = StreamExecutionEnvironment.get_execution_environment()
+        config = Configuration()
+        config.setString("akka.ask.timeout", "20 s")

Review Comment:
   sorry 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19685: [FLINK-27297][python] Add get_execution_environment method with configuration argument.

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19685:
URL: https://github.com/apache/flink/pull/19685#issuecomment-1122193651

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "64a7b2f41fb205d6b4ecffe73d2f654520c15907",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "64a7b2f41fb205d6b4ecffe73d2f654520c15907",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 64a7b2f41fb205d6b4ecffe73d2f654520c15907 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org