You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/12 14:57:39 UTC

[GitHub] [flink] HuangXingBo opened a new pull request #14621: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream

HuangXingBo opened a new pull request #14621:
URL: https://github.com/apache/flink/pull/14621


   ## What is the purpose of the change
   
   *This pull request will Config Python Operator Use Managed Memory In Python DataStream*
   
   
   ## Brief change log
   
     - *Add method `setManagedMemory` to set all Python Operator to use Managed Memory*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *Correct original test `test_from_data_stream` to cover this feature*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (not applicable)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14621: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14621:
URL: https://github.com/apache/flink/pull/14621#issuecomment-758731111


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1b468d46e941274186770f42412fc0725992f364",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11940",
       "triggerID" : "1b468d46e941274186770f42412fc0725992f364",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dab22fa1d5e18201415322cbd928767223258e16",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12026",
       "triggerID" : "dab22fa1d5e18201415322cbd928767223258e16",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e0aa9de7e35bc1ac79a3d938454a9b5368445c69",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12100",
       "triggerID" : "e0aa9de7e35bc1ac79a3d938454a9b5368445c69",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1a07f8eafb9f951a217ec7622525958a4eeaf333",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12118",
       "triggerID" : "1a07f8eafb9f951a217ec7622525958a4eeaf333",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1100d0e39c512c5a301cf979f38902e9f0a99659",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12143",
       "triggerID" : "1100d0e39c512c5a301cf979f38902e9f0a99659",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1100d0e39c512c5a301cf979f38902e9f0a99659 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12143) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14621: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14621:
URL: https://github.com/apache/flink/pull/14621#issuecomment-758731111


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1b468d46e941274186770f42412fc0725992f364",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11940",
       "triggerID" : "1b468d46e941274186770f42412fc0725992f364",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dab22fa1d5e18201415322cbd928767223258e16",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12026",
       "triggerID" : "dab22fa1d5e18201415322cbd928767223258e16",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e0aa9de7e35bc1ac79a3d938454a9b5368445c69",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12100",
       "triggerID" : "e0aa9de7e35bc1ac79a3d938454a9b5368445c69",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1a07f8eafb9f951a217ec7622525958a4eeaf333",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12118",
       "triggerID" : "1a07f8eafb9f951a217ec7622525958a4eeaf333",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1100d0e39c512c5a301cf979f38902e9f0a99659",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1100d0e39c512c5a301cf979f38902e9f0a99659",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1a07f8eafb9f951a217ec7622525958a4eeaf333 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12118) 
   * 1100d0e39c512c5a301cf979f38902e9f0a99659 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14621: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14621:
URL: https://github.com/apache/flink/pull/14621#issuecomment-758731111


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1b468d46e941274186770f42412fc0725992f364",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11940",
       "triggerID" : "1b468d46e941274186770f42412fc0725992f364",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dab22fa1d5e18201415322cbd928767223258e16",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12026",
       "triggerID" : "dab22fa1d5e18201415322cbd928767223258e16",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e0aa9de7e35bc1ac79a3d938454a9b5368445c69",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12100",
       "triggerID" : "e0aa9de7e35bc1ac79a3d938454a9b5368445c69",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * dab22fa1d5e18201415322cbd928767223258e16 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12026) 
   * e0aa9de7e35bc1ac79a3d938454a9b5368445c69 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12100) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14621: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14621:
URL: https://github.com/apache/flink/pull/14621#issuecomment-758731111


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1b468d46e941274186770f42412fc0725992f364",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11940",
       "triggerID" : "1b468d46e941274186770f42412fc0725992f364",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dab22fa1d5e18201415322cbd928767223258e16",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12026",
       "triggerID" : "dab22fa1d5e18201415322cbd928767223258e16",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e0aa9de7e35bc1ac79a3d938454a9b5368445c69",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12100",
       "triggerID" : "e0aa9de7e35bc1ac79a3d938454a9b5368445c69",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1a07f8eafb9f951a217ec7622525958a4eeaf333",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12118",
       "triggerID" : "1a07f8eafb9f951a217ec7622525958a4eeaf333",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1100d0e39c512c5a301cf979f38902e9f0a99659",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12143",
       "triggerID" : "1100d0e39c512c5a301cf979f38902e9f0a99659",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1a07f8eafb9f951a217ec7622525958a4eeaf333 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12118) 
   * 1100d0e39c512c5a301cf979f38902e9f0a99659 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12143) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] HuangXingBo commented on a change in pull request #14621: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on a change in pull request #14621:
URL: https://github.com/apache/flink/pull/14621#discussion_r558174486



##########
File path: flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
##########
@@ -215,6 +228,19 @@ private static boolean isPythonOperator(StreamOperatorFactory streamOperatorFact
         }
     }
 
+    private static boolean isPythonOperator(Transformation<?> transform) {
+        if (transform instanceof OneInputTransformation) {
+            return isPythonOperator(((OneInputTransformation) transform).getOperatorFactory());
+        } else if (transform instanceof TwoInputTransformation) {
+            return isPythonOperator(((TwoInputTransformation) transform).getOperatorFactory());
+        } else if (transform instanceof AbstractMultipleInputTransformation) {
+            return isPythonOperator(
+                    ((AbstractMultipleInputTransformation) transform).getOperatorFactory());
+        } else {
+            return false;

Review comment:
       The transformation maybe other type of `Transformation` such as `SourceTransformation`. So we can't throw an exception here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14621: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14621:
URL: https://github.com/apache/flink/pull/14621#issuecomment-758731111


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1b468d46e941274186770f42412fc0725992f364",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11940",
       "triggerID" : "1b468d46e941274186770f42412fc0725992f364",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dab22fa1d5e18201415322cbd928767223258e16",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12026",
       "triggerID" : "dab22fa1d5e18201415322cbd928767223258e16",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1b468d46e941274186770f42412fc0725992f364 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11940) 
   * dab22fa1d5e18201415322cbd928767223258e16 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12026) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14621: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14621:
URL: https://github.com/apache/flink/pull/14621#issuecomment-758713252


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 1100d0e39c512c5a301cf979f38902e9f0a99659 (Fri May 28 07:09:43 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dianfu commented on a change in pull request #14621: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #14621:
URL: https://github.com/apache/flink/pull/14621#discussion_r557882505



##########
File path: flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
##########
@@ -270,4 +296,39 @@ private static boolean isExecuteInBatchMode(
         }
         return !existsUnboundedSource;
     }
+
+    public static Configuration getConfig(StreamExecutionEnvironment env, TableConfig tableConfig) {

Review comment:
       What about rename to getMergedConfig?

##########
File path: flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
##########
@@ -270,4 +296,39 @@ private static boolean isExecuteInBatchMode(
         }
         return !existsUnboundedSource;
     }
+
+    public static Configuration getConfig(StreamExecutionEnvironment env, TableConfig tableConfig) {
+        try {
+            StreamExecutionEnvironment readEnv = getRealEnvironment(env);
+            Configuration config =
+                    PythonDependencyUtils.configurePythonDependencies(
+                            readEnv.getCachedFiles(), getMergedConfiguration(readEnv, tableConfig));
+            config.setString("table.exec.timezone", tableConfig.getLocalTimeZone().getId());
+            return config;
+        } catch (NoSuchFieldException
+                | IllegalAccessException
+                | NoSuchMethodException
+                | InvocationTargetException e) {
+            throw new TableException("Method getConfig failed.", e);
+        }
+    }
+
+    private static StreamExecutionEnvironment getRealEnvironment(StreamExecutionEnvironment env)
+            throws NoSuchFieldException, IllegalAccessException {
+        Field realExecEnvField =
+                DummyStreamExecutionEnvironment.class.getDeclaredField("realExecEnv");
+        realExecEnvField.setAccessible(true);
+        while (env instanceof DummyStreamExecutionEnvironment) {
+            env = (StreamExecutionEnvironment) realExecEnvField.get(env);
+        }
+        return env;
+    }
+
+    private static Configuration getMergedConfiguration(

Review comment:
       What about remove this method to avoid confusing with getMergedConfig?

##########
File path: flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
##########
@@ -215,6 +228,19 @@ private static boolean isPythonOperator(StreamOperatorFactory streamOperatorFact
         }
     }
 
+    private static boolean isPythonOperator(Transformation<?> transform) {
+        if (transform instanceof OneInputTransformation) {
+            return isPythonOperator(((OneInputTransformation) transform).getOperatorFactory());
+        } else if (transform instanceof TwoInputTransformation) {
+            return isPythonOperator(((TwoInputTransformation) transform).getOperatorFactory());
+        } else if (transform instanceof AbstractMultipleInputTransformation) {
+            return isPythonOperator(
+                    ((AbstractMultipleInputTransformation) transform).getOperatorFactory());
+        } else {
+            return false;

Review comment:
       Should throw an exception in this case as it means that we are missing something.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14621: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14621:
URL: https://github.com/apache/flink/pull/14621#issuecomment-758731111


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1b468d46e941274186770f42412fc0725992f364",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11940",
       "triggerID" : "1b468d46e941274186770f42412fc0725992f364",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dab22fa1d5e18201415322cbd928767223258e16",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12026",
       "triggerID" : "dab22fa1d5e18201415322cbd928767223258e16",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e0aa9de7e35bc1ac79a3d938454a9b5368445c69",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12100",
       "triggerID" : "e0aa9de7e35bc1ac79a3d938454a9b5368445c69",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1a07f8eafb9f951a217ec7622525958a4eeaf333",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12118",
       "triggerID" : "1a07f8eafb9f951a217ec7622525958a4eeaf333",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1a07f8eafb9f951a217ec7622525958a4eeaf333 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12118) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dianfu commented on a change in pull request #14621: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #14621:
URL: https://github.com/apache/flink/pull/14621#discussion_r557003451



##########
File path: flink-python/pyflink/table/table_environment.py
##########
@@ -1736,6 +1736,9 @@ def from_data_stream(self, data_stream: DataStream, *fields: Union[str, Expressi
         .. versionadded:: 1.12.0
         """
         j_data_stream = data_stream._j_data_stream
+        get_gateway().jvm \
+            .org.apache.flink.python.util.PythonConfigUtil.setManagedMemory(
+            j_data_stream.getTransformation(), self._j_tenv.getConfig().getConfiguration())

Review comment:
       The configuration maybe configured via flink-conf.yaml and then it's not available in self._j_tenv.getConfig().getConfiguration().

##########
File path: flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
##########
@@ -215,6 +221,20 @@ private static boolean isPythonOperator(StreamOperatorFactory streamOperatorFact
         }
     }
 
+    private static boolean isPythonOperator(Transformation<?> transform) {
+        if (transform instanceof OneInputTransformation

Review comment:
       What about refactor it a bit to make it more readable?
   ```
   if (transform instanceof OneInputTransformation) {
          return isPythonOperator(((OneInputTransformation) transform).getOperatorFactory());
   } else if (transform instanceof TwoInputTransformation) {
      return  isPythonOperator(((TwoInputTransformation) transform).getOperatorFactory());
   } else {
       Preconditions.checkState(transform instanceof AbstractMultipleInputTransformation);
      return isPythonOperator(
                               ((AbstractMultipleInputTransformation) transform).getOperatorFactory());
   }
   ```

##########
File path: flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
##########
@@ -128,6 +128,23 @@ private static void chainStreamNode(
         firstStream.setSlotSharingGroup(secondStream.getSlotSharingGroup());
     }
 
+    /** Set Python Operator Use Managed Memory. */
+    public static void setManagedMemory(Transformation<?> transformation, Configuration config) {
+        if (config.getBoolean(PythonOptions.USE_MANAGED_MEMORY)) {
+            setManagedMemory(transformation);
+        }
+    }
+
+    private static void setManagedMemory(Transformation<?> transformation) {
+        List<Transformation<?>> inputTransformations = transformation.getInputs();

Review comment:
       nit: what about moving this line just before `for (Transformation inputTransformation : inputTransformations) ` where it's used?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dianfu closed pull request #14621: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream

Posted by GitBox <gi...@apache.org>.
dianfu closed pull request #14621:
URL: https://github.com/apache/flink/pull/14621


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14621: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14621:
URL: https://github.com/apache/flink/pull/14621#issuecomment-758731111


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1b468d46e941274186770f42412fc0725992f364",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11940",
       "triggerID" : "1b468d46e941274186770f42412fc0725992f364",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1b468d46e941274186770f42412fc0725992f364 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11940) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14621: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14621:
URL: https://github.com/apache/flink/pull/14621#issuecomment-758713252


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 1b468d46e941274186770f42412fc0725992f364 (Tue Jan 12 15:00:45 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
    * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-20933).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14621: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14621:
URL: https://github.com/apache/flink/pull/14621#issuecomment-758731111


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1b468d46e941274186770f42412fc0725992f364",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11940",
       "triggerID" : "1b468d46e941274186770f42412fc0725992f364",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dab22fa1d5e18201415322cbd928767223258e16",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12026",
       "triggerID" : "dab22fa1d5e18201415322cbd928767223258e16",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e0aa9de7e35bc1ac79a3d938454a9b5368445c69",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e0aa9de7e35bc1ac79a3d938454a9b5368445c69",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * dab22fa1d5e18201415322cbd928767223258e16 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12026) 
   * e0aa9de7e35bc1ac79a3d938454a9b5368445c69 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14621: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14621:
URL: https://github.com/apache/flink/pull/14621#issuecomment-758731111


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1b468d46e941274186770f42412fc0725992f364",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11940",
       "triggerID" : "1b468d46e941274186770f42412fc0725992f364",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dab22fa1d5e18201415322cbd928767223258e16",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12026",
       "triggerID" : "dab22fa1d5e18201415322cbd928767223258e16",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * dab22fa1d5e18201415322cbd928767223258e16 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12026) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14621: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14621:
URL: https://github.com/apache/flink/pull/14621#issuecomment-758731111


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1b468d46e941274186770f42412fc0725992f364",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11940",
       "triggerID" : "1b468d46e941274186770f42412fc0725992f364",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dab22fa1d5e18201415322cbd928767223258e16",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12026",
       "triggerID" : "dab22fa1d5e18201415322cbd928767223258e16",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e0aa9de7e35bc1ac79a3d938454a9b5368445c69",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12100",
       "triggerID" : "e0aa9de7e35bc1ac79a3d938454a9b5368445c69",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1a07f8eafb9f951a217ec7622525958a4eeaf333",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1a07f8eafb9f951a217ec7622525958a4eeaf333",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e0aa9de7e35bc1ac79a3d938454a9b5368445c69 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12100) 
   * 1a07f8eafb9f951a217ec7622525958a4eeaf333 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14621: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14621:
URL: https://github.com/apache/flink/pull/14621#issuecomment-758731111


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1b468d46e941274186770f42412fc0725992f364",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11940",
       "triggerID" : "1b468d46e941274186770f42412fc0725992f364",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dab22fa1d5e18201415322cbd928767223258e16",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "dab22fa1d5e18201415322cbd928767223258e16",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1b468d46e941274186770f42412fc0725992f364 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11940) 
   * dab22fa1d5e18201415322cbd928767223258e16 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14621: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14621:
URL: https://github.com/apache/flink/pull/14621#issuecomment-758731111


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1b468d46e941274186770f42412fc0725992f364",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11940",
       "triggerID" : "1b468d46e941274186770f42412fc0725992f364",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dab22fa1d5e18201415322cbd928767223258e16",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12026",
       "triggerID" : "dab22fa1d5e18201415322cbd928767223258e16",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e0aa9de7e35bc1ac79a3d938454a9b5368445c69",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12100",
       "triggerID" : "e0aa9de7e35bc1ac79a3d938454a9b5368445c69",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1a07f8eafb9f951a217ec7622525958a4eeaf333",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12118",
       "triggerID" : "1a07f8eafb9f951a217ec7622525958a4eeaf333",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e0aa9de7e35bc1ac79a3d938454a9b5368445c69 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12100) 
   * 1a07f8eafb9f951a217ec7622525958a4eeaf333 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12118) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14621: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14621:
URL: https://github.com/apache/flink/pull/14621#issuecomment-758731111


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1b468d46e941274186770f42412fc0725992f364",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11940",
       "triggerID" : "1b468d46e941274186770f42412fc0725992f364",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1b468d46e941274186770f42412fc0725992f364 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11940) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14621: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14621:
URL: https://github.com/apache/flink/pull/14621#issuecomment-758731111


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1b468d46e941274186770f42412fc0725992f364",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1b468d46e941274186770f42412fc0725992f364",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1b468d46e941274186770f42412fc0725992f364 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14621: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14621:
URL: https://github.com/apache/flink/pull/14621#issuecomment-758731111


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1b468d46e941274186770f42412fc0725992f364",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11940",
       "triggerID" : "1b468d46e941274186770f42412fc0725992f364",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dab22fa1d5e18201415322cbd928767223258e16",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12026",
       "triggerID" : "dab22fa1d5e18201415322cbd928767223258e16",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e0aa9de7e35bc1ac79a3d938454a9b5368445c69",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12100",
       "triggerID" : "e0aa9de7e35bc1ac79a3d938454a9b5368445c69",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e0aa9de7e35bc1ac79a3d938454a9b5368445c69 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12100) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org