You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/14 03:00:49 UTC

[GitHub] [flink] dianfu commented on a change in pull request #14621: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream

dianfu commented on a change in pull request #14621:
URL: https://github.com/apache/flink/pull/14621#discussion_r557003451



##########
File path: flink-python/pyflink/table/table_environment.py
##########
@@ -1736,6 +1736,9 @@ def from_data_stream(self, data_stream: DataStream, *fields: Union[str, Expressi
         .. versionadded:: 1.12.0
         """
         j_data_stream = data_stream._j_data_stream
+        get_gateway().jvm \
+            .org.apache.flink.python.util.PythonConfigUtil.setManagedMemory(
+            j_data_stream.getTransformation(), self._j_tenv.getConfig().getConfiguration())

Review comment:
       The configuration maybe configured via flink-conf.yaml and then it's not available in self._j_tenv.getConfig().getConfiguration().

##########
File path: flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
##########
@@ -215,6 +221,20 @@ private static boolean isPythonOperator(StreamOperatorFactory streamOperatorFact
         }
     }
 
+    private static boolean isPythonOperator(Transformation<?> transform) {
+        if (transform instanceof OneInputTransformation

Review comment:
       What about refactor it a bit to make it more readable?
   ```
   if (transform instanceof OneInputTransformation) {
          return isPythonOperator(((OneInputTransformation) transform).getOperatorFactory());
   } else if (transform instanceof TwoInputTransformation) {
      return  isPythonOperator(((TwoInputTransformation) transform).getOperatorFactory());
   } else {
       Preconditions.checkState(transform instanceof AbstractMultipleInputTransformation);
      return isPythonOperator(
                               ((AbstractMultipleInputTransformation) transform).getOperatorFactory());
   }
   ```

##########
File path: flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
##########
@@ -128,6 +128,23 @@ private static void chainStreamNode(
         firstStream.setSlotSharingGroup(secondStream.getSlotSharingGroup());
     }
 
+    /** Set Python Operator Use Managed Memory. */
+    public static void setManagedMemory(Transformation<?> transformation, Configuration config) {
+        if (config.getBoolean(PythonOptions.USE_MANAGED_MEMORY)) {
+            setManagedMemory(transformation);
+        }
+    }
+
+    private static void setManagedMemory(Transformation<?> transformation) {
+        List<Transformation<?>> inputTransformations = transformation.getInputs();

Review comment:
       nit: what about moving this line just before `for (Transformation inputTransformation : inputTransformations) ` where it's used?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org