You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by hx...@apache.org on 2022/03/21 08:46:55 UTC

[flink] branch release-1.15 updated: [FLINK-26727][python] Fix the implementation of sub-interpreter in Thread Mode

This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 1f60be8  [FLINK-26727][python] Fix the implementation of sub-interpreter in Thread Mode
1f60be8 is described below

commit 1f60be8a39b4130de7e0e588d856623858d7b57e
Author: huangxingbo <hx...@gmail.com>
AuthorDate: Fri Mar 18 18:12:40 2022 +0800

    [FLINK-26727][python] Fix the implementation of sub-interpreter in Thread Mode
    
    This closes #19150.
---
 .../docs/dev/python/python_execution_mode.md       | 57 ++++++--------
 .../docs/dev/python/python_execution_mode.md       | 57 ++++++--------
 .../shortcodes/generated/python_configuration.html |  2 +-
 .../pyflink/table/tests/test_dependency.py         | 15 +---
 flink-python/pyflink/table/tests/test_udf.py       | 15 +---
 .../org/apache/flink/python/PythonOptions.java     |  9 +--
 .../embedded/EmbeddedPythonEnvironmentManager.java | 16 +---
 .../AbstractEmbeddedPythonFunctionOperator.java    |  8 +-
 .../EmbeddedPythonScalarFunctionOperator.java      | 90 ++++------------------
 9 files changed, 75 insertions(+), 194 deletions(-)

diff --git a/docs/content.zh/docs/dev/python/python_execution_mode.md b/docs/content.zh/docs/dev/python/python_execution_mode.md
index c34da24..4bb3eb1 100644
--- a/docs/content.zh/docs/dev/python/python_execution_mode.md
+++ b/docs/content.zh/docs/dev/python/python_execution_mode.md
@@ -31,33 +31,24 @@ defines how to execute your customized Python functions.
 Prior to release-1.15, there is the only execution mode called `PROCESS` execution mode. The `PROCESS`
 mode means that the Python user-defined functions will be executed in separate Python processes.
 
-In release-1.15, it has introduced another two execution modes called `MULTI-THREAD` execution mode and
-`SUB-INTERPRETER` execution mode. The `MULTI-THREAD` mode means that the Python user-defined functions
-will be executed in the same thread as Java Operator, but it will be affected by GIL performance.
-The `SUB-INTERPRETER` mode means that the Python user-defined functions will be executed in Python
-different sub-interpreters rather than different threads of one interpreter, which can largely overcome
-the effects of the GIL, but some CPython extensions libraries doesn't support it, such as numpy, tensorflow, etc.
+In release-1.15, it has introduced a new execution mode called `THREAD` execution mode. The `THREAD`
+mode means that the Python user-defined functions will be executed in the same process as Java Operator,
+It should be noted that multiple Python user-defined functions running in the same JVM are still affected by GIL.
 
-## When can/should I use MULTI-THREAD execution mode or SUB-INTERPRETER execution mode?
+## When can/should I use THREAD execution mode?
 
-The purpose of the introduction of `MULTI-THREAD` mode and `SUB-INTERPRETER` mode is to overcome the
-overhead of serialization/deserialization and network communication caused in `PROCESS` mode.
-So if performance is not your concern, or the computing logic of your customized Python functions is
-the performance bottleneck of the job, `PROCESS` mode will be the best choice as `PROCESS` mode provides
-the best isolation compared to `MULTI-THREAD` mode and `SUB-INTERPRETER` mode.
-
-Compared to `MULTI-THREAD` execution mode, `SUB-INTERPRETER` execution mode can largely overcome the
-effects of the GIL, so you can get better performance usually. However, `SUB-INTERPRETER` may fail in some CPython
-extensions libraries, such as numpy, tensorflow. In this case, you should use `PROCESS` mode or `MULTI-THREAD` mode.
+The purpose of the introduction of `THREAD` mode is to overcome the overhead of serialization/deserialization
+and network communication caused in `PROCESS` mode. So if performance is not your concern, or the computing
+logic of your customized Python functions is the performance bottleneck of the job, `PROCESS` mode will
+be the best choice as `PROCESS` mode provides the best isolation compared to `THREAD` mode.
 
 ## Configuring Python execution mode
 
 The execution mode can be configured via the `python.execution-mode` setting.
-There are three possible values:
+There are two possible values:
 
  - `PROCESS`: The Python user-defined functions will be executed in separate Python process. (default)
- - `MULTI-THREAD`: The Python user-defined functions will be executed in the same thread as Java Operator.
- - `SUB-INTERPRETER`: The Python user-defined functions will be executed in Python different sub-interpreters.
+ - `THREAD`: The Python user-defined functions will be executed in the same process as Java operator.
 
 You could specify the Python execution mode using Python Table API as following:
 
@@ -65,27 +56,23 @@ You could specify the Python execution mode using Python Table API as following:
 # Specify `PROCESS` mode
 table_env.get_config().get_configuration().set_string("python.execution-mode", "process")
 
-# Specify `MULTI-THREAD` mode
-table_env.get_config().get_configuration().set_string("python.execution-mode", "multi-thread")
-
-# Specify `SUB-INTERPRETER` mode
-table_env.get_config().get_configuration().set_string("python.execution-mode", "sub-interpreter")
+# Specify `THREAD` mode
+table_env.get_config().get_configuration().set_string("python.execution-mode", "thread")
 ```
 
 {{< hint info >}}
-Currently, it still doesn't support to execute Python UDFs in `MULTI-THREAD` and `SUB-INTERPRETER` execution mode
-in all places. It will fall back to `PROCESS` execution mode in these cases. So it may happen that you configure a job
-to execute in `MULTI-THREAD` or `SUB-INTERPRETER` execution modes, however, it's actually executed in `PROCESS` execution mode.
+Currently, it still doesn't support to execute Python UDFs in `THREAD` execution mode in all places.
+It will fall back to `PROCESS` execution mode in these cases. So it may happen that you configure a job
+to execute in `THREAD` execution mode, however, it's actually executed in `PROCESS` execution mode.
 {{< /hint >}}
 {{< hint info >}}
-`MULTI-THREAD` execution mode only supports Python 3.7+. `SUB-INTERPRETER` execution mode only supports Python 3.8+.  
+`THREAD` execution mode is only supported in Python 3.7+.
 {{< /hint >}}
 
 ## Execution Behavior
 
-This section provides an overview of the execution behavior of `MULTI-THREAD` and `SUB-INTERPRETER`
-execution mode and contrasts they with `PROCESS` execution mode. For more
-details, please refer to the FLIP that introduced this feature:
+This section provides an overview of the execution behavior of `THREAD` execution mode and contrasts
+they with `PROCESS` execution mode. For more details, please refer to the FLIP that introduced this feature:
 [FLIP-206](https://cwiki.apache.org/confluence/display/FLINK/FLIP-206%3A+Support+PyFlink+Runtime+Execution+in+Thread+Mode).
 
 #### PROCESS Execution Mode
@@ -95,10 +82,10 @@ The Java operator process communicates with the Python worker process using vari
 
 {{< img src="/fig/pyflink_process_execution_mode.png" alt="Process Execution Mode" >}}
 
-#### MULTI-THREAD and SUB-INTERPRETER Execution Mode
+#### THREAD Execution Mode
 
-In `MULTI-THREAD` and `SUB-INTERPRETER` execution mode, the Python user-defined functions will be executed in
-the same process as Java operators. PyFlink takes use of third part library [PEMJA](https://github.com/alibaba/pemja) to
-embed Python in Java Application.
+In `THREAD` execution mode, the Python user-defined functions will be executed in the same process
+as Java operators. PyFlink takes use of third part library [PEMJA](https://github.com/alibaba/pemja)
+to embed Python in Java Application.
 
 {{< img src="/fig/pyflink_embedded_execution_mode.png" alt="Embedded Execution Mode" >}}
diff --git a/docs/content/docs/dev/python/python_execution_mode.md b/docs/content/docs/dev/python/python_execution_mode.md
index 2c96958..4f5ecf2 100644
--- a/docs/content/docs/dev/python/python_execution_mode.md
+++ b/docs/content/docs/dev/python/python_execution_mode.md
@@ -31,33 +31,24 @@ defines how to execute your customized Python functions.
 Prior to release-1.15, there is the only execution mode called `PROCESS` execution mode. The `PROCESS`
 mode means that the Python user-defined functions will be executed in separate Python processes.
 
-In release-1.15, it has introduced another two execution modes called `MULTI-THREAD` execution mode and
-`SUB-INTERPRETER` execution mode. The `MULTI-THREAD` mode means that the Python user-defined functions
-will be executed in the same thread as Java Operator, but it will be affected by GIL performance.
-The `SUB-INTERPRETER` mode means that the Python user-defined functions will be executed in Python
-different sub-interpreters rather than different threads of one interpreter, which can largely overcome
-the effects of the GIL, but some CPython extensions libraries doesn't support it, such as numpy, tensorflow, etc.
+In release-1.15, it has introduced a new execution mode called `THREAD` execution mode. The `THREAD`
+mode means that the Python user-defined functions will be executed in the same process as Java Operator,
+It should be noted that multiple Python user-defined functions running in the same JVM are still affected by GIL.
 
-## When can/should I use MULTI-THREAD execution mode or SUB-INTERPRETER execution mode?
+## When can/should I use THREAD execution mode?
 
-The purpose of the introduction of `MULTI-THREAD` mode and `SUB-INTERPRETER` mode is to overcome the
-overhead of serialization/deserialization and network communication caused in `PROCESS` mode.
-So if performance is not your concern, or the computing logic of your customized Python functions is
-the performance bottleneck of the job, `PROCESS` mode will be the best choice as `PROCESS` mode provides
-the best isolation compared to `MULTI-THREAD` mode and `SUB-INTERPRETER` mode.
-
-Compared to `MULTI-THREAD` execution mode, `SUB-INTERPRETER` execution mode can largely overcome the
-effects of the GIL, so you can get better performance usually. However, `SUB-INTERPRETER` may fail in some CPython
-extensions libraries, such as numpy, tensorflow. In this case, you should use `PROCESS` mode or `MULTI-THREAD` mode.
+The purpose of the introduction of `THREAD` mode is to overcome the overhead of serialization/deserialization
+and network communication caused in `PROCESS` mode. So if performance is not your concern, or the computing
+logic of your customized Python functions is the performance bottleneck of the job, `PROCESS` mode will
+be the best choice as `PROCESS` mode provides the best isolation compared to `THREAD` mode.
 
 ## Configuring Python execution mode
 
 The execution mode can be configured via the `python.execution-mode` setting.
-There are three possible values:
+There are two possible values:
 
  - `PROCESS`: The Python user-defined functions will be executed in separate Python process. (default)
- - `MULTI-THREAD`: The Python user-defined functions will be executed in the same thread as Java Operator.
- - `SUB-INTERPRETER`: The Python user-defined functions will be executed in Python different sub-interpreters.
+ - `THREAD`: The Python user-defined functions will be executed in the same process as Java operator.
 
 You could specify the Python execution mode using Python Table API as following:
 
@@ -65,27 +56,23 @@ You could specify the Python execution mode using Python Table API as following:
 # Specify `PROCESS` mode
 table_env.get_config().get_configuration().set_string("python.execution-mode", "process")
 
-# Specify `MULTI-THREAD` mode
-table_env.get_config().get_configuration().set_string("python.execution-mode", "multi-thread")
-
-# Specify `SUB-INTERPRETER` mode
-table_env.get_config().get_configuration().set_string("python.execution-mode", "sub-interpreter")
+# Specify `THREAD` mode
+table_env.get_config().get_configuration().set_string("python.execution-mode", "thread")
 ```
 
 {{< hint info >}}
-Currently, it still doesn't support to execute Python UDFs in `MULTI-THREAD` and `SUB-INTERPRETER` execution mode
-in all places. It will fall back to `PROCESS` execution mode in these cases. So it may happen that you configure a job
-to execute in `MULTI-THREAD` or `SUB-INTERPRETER` execution modes, however, it's actually executed in `PROCESS` execution mode.
+Currently, it still doesn't support to execute Python UDFs in `THREAD` execution mode in all places.
+It will fall back to `PROCESS` execution mode in these cases. So it may happen that you configure a job
+to execute in `THREAD` execution mode, however, it's actually executed in `PROCESS` execution mode.
 {{< /hint >}}
 {{< hint info >}}
-`MULTI-THREAD` execution mode only supports Python 3.7+. `SUB-INTERPRETER` execution mode only supports Python 3.8+.  
+`THREAD` execution mode is only supported in Python 3.7+.
 {{< /hint >}}
 
 ## Execution Behavior
 
-This section provides an overview of the execution behavior of `MULTI-THREAD` and `SUB-INTERPRETER`
-execution mode and contrasts they with `PROCESS` execution mode. For more
-details, please refer to the FLIP that introduced this feature:
+This section provides an overview of the execution behavior of `THREAD` execution mode and contrasts
+they with `PROCESS` execution mode. For more details, please refer to the FLIP that introduced this feature:
 [FLIP-206](https://cwiki.apache.org/confluence/display/FLINK/FLIP-206%3A+Support+PyFlink+Runtime+Execution+in+Thread+Mode).
 
 #### PROCESS Execution Mode
@@ -95,10 +82,10 @@ The Java operator process communicates with the Python worker process using vari
 
 {{< img src="/fig/pyflink_process_execution_mode.png" alt="Process Execution Mode" >}}
 
-#### MULTI-THREAD and SUB-INTERPRETER Execution Mode
+#### THREAD Execution Mode
 
-In `MULTI-THREAD` and `SUB-INTERPRETER` execution mode, the Python user-defined functions will be executed in
-the same process as Java operators. PyFlink takes use of third part library [PEMJA](https://github.com/alibaba/pemja) to
-embed Python in Java Application.
+In `THREAD` execution mode, the Python user-defined functions will be executed in the same process
+as Java operators. PyFlink takes use of third part library [PEMJA](https://github.com/alibaba/pemja)
+to embed Python in Java Application.
 
 {{< img src="/fig/pyflink_embedded_execution_mode.png" alt="Embedded Execution Mode" >}}
diff --git a/docs/layouts/shortcodes/generated/python_configuration.html b/docs/layouts/shortcodes/generated/python_configuration.html
index 9ad42d8..cdaabcf 100644
--- a/docs/layouts/shortcodes/generated/python_configuration.html
+++ b/docs/layouts/shortcodes/generated/python_configuration.html
@@ -30,7 +30,7 @@
             <td><h5>python.execution-mode</h5></td>
             <td style="word-wrap: break-word;">"process"</td>
             <td>String</td>
-            <td>Specify the python runtime execution mode. The optional values are `process`, `multi-thread` and `sub-interpreter`. The `process` mode means that the Python user-defined functions will be executed in separate Python process. The `multi-thread` mode means that the Python user-defined functions will be executed in the same thread as Java Operator, but it will be affected by GIL performance. The `sub-interpreter` mode means that the Python user-defined functions will be exec [...]
+            <td>Specify the python runtime execution mode. The optional values are `process` and `thread`. The `process` mode means that the Python user-defined functions will be executed in separate Python process. The `thread` mode means that the Python user-defined functions will be executed in the same process of the Java operator. Note that currently it still doesn't support to execute Python user-defined functions in `thread` mode in all places. It will fall back to `process` mode  [...]
         </tr>
         <tr>
             <td><h5>python.files</h5></td>
diff --git a/flink-python/pyflink/table/tests/test_dependency.py b/flink-python/pyflink/table/tests/test_dependency.py
index b180c72..5c4ae84 100644
--- a/flink-python/pyflink/table/tests/test_dependency.py
+++ b/flink-python/pyflink/table/tests/test_dependency.py
@@ -93,19 +93,10 @@ class DependencyTests(object):
 
 
 @pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7")
-class EmbeddedMultiThreadDependencyTests(DependencyTests, PyFlinkStreamTableTestCase):
+class EmbeddedThreadDependencyTests(DependencyTests, PyFlinkStreamTableTestCase):
     def setUp(self):
-        super(EmbeddedMultiThreadDependencyTests, self).setUp()
-        self.t_env.get_config().get_configuration().set_string("python.execution-mode",
-                                                               "multi-thread")
-
-
-@pytest.mark.skipif(sys.version_info < (3, 8), reason="requires python3.8")
-class EmbeddedSubInterpreterDependencyTests(DependencyTests, PyFlinkStreamTableTestCase):
-    def setUp(self):
-        super(EmbeddedSubInterpreterDependencyTests, self).setUp()
-        self.t_env.get_config().get_configuration().set_string("python.execution-mode",
-                                                               "sub-interpreter")
+        super(EmbeddedThreadDependencyTests, self).setUp()
+        self.t_env.get_config().get_configuration().set_string("python.execution-mode", "thread")
 
 
 class BatchDependencyTests(DependencyTests, PyFlinkBatchTableTestCase):
diff --git a/flink-python/pyflink/table/tests/test_udf.py b/flink-python/pyflink/table/tests/test_udf.py
index 6abdee2..3e7562a 100644
--- a/flink-python/pyflink/table/tests/test_udf.py
+++ b/flink-python/pyflink/table/tests/test_udf.py
@@ -797,19 +797,10 @@ class PyFlinkBatchUserDefinedFunctionTests(UserDefinedFunctionTests,
 
 
 @pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7")
-class PyFlinkEmbeddedMultiThreadTests(UserDefinedFunctionTests, PyFlinkBatchTableTestCase):
+class PyFlinkEmbeddedThreadTests(UserDefinedFunctionTests, PyFlinkBatchTableTestCase):
     def setUp(self):
-        super(PyFlinkEmbeddedMultiThreadTests, self).setUp()
-        self.t_env.get_config().get_configuration().set_string("python.execution-mode",
-                                                               "multi-thread")
-
-
-@pytest.mark.skipif(sys.version_info < (3, 8), reason="requires python3.8")
-class PyFlinkEmbeddedSubInterpreterTests(UserDefinedFunctionTests, PyFlinkBatchTableTestCase):
-    def setUp(self):
-        super(PyFlinkEmbeddedSubInterpreterTests, self).setUp()
-        self.t_env.get_config().get_configuration().set_string("python.execution-mode",
-                                                               "sub-interpreter")
+        super(PyFlinkEmbeddedThreadTests, self).setUp()
+        self.t_env.get_config().get_configuration().set_string("python.execution-mode", "thread")
 
 
 # test specify the input_types
diff --git a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
index 649e2e6..78cd3b5 100644
--- a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
+++ b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
@@ -231,10 +231,9 @@ public class PythonOptions {
                     .stringType()
                     .defaultValue("process")
                     .withDescription(
-                            "Specify the python runtime execution mode. The optional values are `process`, `multi-thread` and `sub-interpreter`. "
+                            "Specify the python runtime execution mode. The optional values are `process` and `thread`. "
                                     + "The `process` mode means that the Python user-defined functions will be executed in separate Python process. "
-                                    + "The `multi-thread` mode means that the Python user-defined functions will be executed in the same thread as Java Operator, but it will be affected by GIL performance. "
-                                    + "The `sub-interpreter` mode means that the Python user-defined functions will be executed in python different sub-interpreters rather than different threads of one interpreter, "
-                                    + "which can largely overcome the effects of the GIL, but it maybe fail in some CPython extensions libraries, such as numpy, tensorflow. "
-                                    + "Note that if the python operator dose not support `multi-thread` and `sub-interpreter` mode, we will still use `process` mode.");
+                                    + "The `thread` mode means that the Python user-defined functions will be executed in the same process of the Java operator. "
+                                    + "Note that currently it still doesn't support to execute Python user-defined functions in `thread` mode in all places. "
+                                    + "It will fall back to `process` mode in these cases.");
 }
diff --git a/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironmentManager.java b/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironmentManager.java
index 1d7e9a0..750284e 100644
--- a/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironmentManager.java
+++ b/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironmentManager.java
@@ -54,9 +54,7 @@ public class EmbeddedPythonEnvironmentManager extends AbstractPythonEnvironmentM
 
         String executionMode = dependencyInfo.getExecutionMode();
 
-        if (executionMode.equalsIgnoreCase("sub-interpreter")) {
-            execType = PythonInterpreterConfig.ExecType.SUB_INTERPRETER;
-        } else if (executionMode.equalsIgnoreCase("multi-thread")) {
+        if (executionMode.equalsIgnoreCase("thread")) {
             execType = PythonInterpreterConfig.ExecType.MULTI_THREAD;
         } else {
             throw new RuntimeException(
@@ -66,16 +64,8 @@ public class EmbeddedPythonEnvironmentManager extends AbstractPythonEnvironmentM
         String pythonVersion =
                 PythonEnvironmentManagerUtils.getPythonVersion(dependencyInfo.getPythonExec());
 
-        if (execType == PythonInterpreterConfig.ExecType.SUB_INTERPRETER) {
-            if (pythonVersion.compareTo("3.8") < 0) {
-                throw new RuntimeException(
-                        "`SUB-INTERPRETER` execution mode only supports Python 3.8+");
-            }
-        } else {
-            if (pythonVersion.compareTo("3.7") < 0) {
-                throw new RuntimeException(
-                        "`MULTI-THREAD` execution mode only supports Python 3.7+");
-            }
+        if (pythonVersion.compareTo("3.7") < 0) {
+            throw new RuntimeException("`THREAD` execution mode only supports Python 3.7+");
         }
 
         if (env.containsKey("FLINK_TESTING")) {
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java
index f23e3d2..be709e6 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java
@@ -105,7 +105,7 @@ public abstract class AbstractEmbeddedPythonFunctionOperator<OUT>
             }
         }
 
-        openPythonInterpreter(pythonConfig.getPythonExec(), env, interpreterConfig.getExecType());
+        openPythonInterpreter(pythonConfig.getPythonExec(), env);
     }
 
     @Override
@@ -152,11 +152,7 @@ public abstract class AbstractEmbeddedPythonFunctionOperator<OUT>
     }
 
     /** Setup method for Python Interpreter. It can be used for initialization work. */
-    public abstract void openPythonInterpreter(
-            String pythonExecutable,
-            Map<String, String> env,
-            PythonInterpreterConfig.ExecType execType)
-            throws Exception;
+    public abstract void openPythonInterpreter(String pythonExecutable, Map<String, String> env);
 
     /** Returns the {@link PythonEnv} used to create PythonEnvironmentManager. */
     public abstract PythonEnv getPythonEnv();
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java
index 848b678..3c843cd 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java
@@ -40,13 +40,6 @@ import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
-import pemja.core.PythonInterpreterConfig;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -156,46 +149,21 @@ public class EmbeddedPythonScalarFunctionOperator
     }
 
     @Override
-    public void openPythonInterpreter(
-            String pythonExecutable,
-            Map<String, String> env,
-            PythonInterpreterConfig.ExecType execType)
-            throws Exception {
-        if (execType.equals(PythonInterpreterConfig.ExecType.SUB_INTERPRETER)) {
-            LOG.info("Create Operation in sub-interpreters.");
-            String[] commands =
-                    new String[] {
-                        pythonExecutable,
-                        "-c",
-                        String.format(
-                                "from pyflink.fn_execution.utils.operation_utils import create_serialized_scalar_operation_from_proto;"
-                                        + "print(create_serialized_scalar_operation_from_proto(%s, %s, %s))",
-                                Arrays.toString(getUserDefinedFunctionsProto().toByteArray()),
-                                isOneArg ? "True" : "False",
-                                isOneFieldResult ? "True" : "False")
-                    };
-            interpreter.exec(
-                    "from pyflink.fn_execution.utils.operation_utils import deserialized_operation_from_serialized_bytes");
-            interpreter.exec(
-                    String.format(
-                            "scalar_operation = deserialized_operation_from_serialized_bytes(%s)",
-                            executeScript(commands, env)));
-        } else {
-            LOG.info("Create Operation in multi-threads.");
-
-            // The CPython extension included in proto does not support initialization
-            // multiple times, so we choose the only interpreter process to be responsible for
-            // initialization and proto parsing. The only interpreter parses the proto and
-            // serializes function operations with cloudpickle.
-            interpreter.exec(
-                    "from pyflink.fn_execution.utils.operation_utils import create_scalar_operation_from_proto");
-            interpreter.set("proto", getUserDefinedFunctionsProto().toByteArray());
-
-            interpreter.exec(
-                    String.format(
-                            "scalar_operation = create_scalar_operation_from_proto(proto, %s, %s)",
-                            isOneArg ? "True" : "False", isOneFieldResult ? "True" : "False"));
-        }
+    public void openPythonInterpreter(String pythonExecutable, Map<String, String> env) {
+        LOG.info("Create Operation in multi-threads.");
+
+        // The CPython extension included in proto does not support initialization
+        // multiple times, so we choose the only interpreter process to be responsible for
+        // initialization and proto parsing. The only interpreter parses the proto and
+        // serializes function operations with cloudpickle.
+        interpreter.exec(
+                "from pyflink.fn_execution.utils.operation_utils import create_scalar_operation_from_proto");
+        interpreter.set("proto", getUserDefinedFunctionsProto().toByteArray());
+
+        interpreter.exec(
+                String.format(
+                        "scalar_operation = create_scalar_operation_from_proto(proto, %s, %s)",
+                        isOneArg ? "True" : "False", isOneFieldResult ? "True" : "False"));
 
         // invoke `open` method of ScalarOperation.
         interpreter.invokeMethod("scalar_operation", "open");
@@ -272,32 +240,4 @@ public class EmbeddedPythonScalarFunctionOperator
         builder.setProfileEnabled(pythonConfig.isProfileEnabled());
         return builder.build();
     }
-
-    private String executeScript(final String[] commands, Map<String, String> env)
-            throws IOException {
-        ProcessBuilder pb = new ProcessBuilder(commands);
-        pb.environment().putAll(env);
-        pb.redirectErrorStream(true);
-        Process p = pb.start();
-        InputStream in = new BufferedInputStream(p.getInputStream());
-        StringBuilder out = new StringBuilder();
-        String s;
-        try (BufferedReader br = new BufferedReader(new InputStreamReader(in))) {
-            while ((s = br.readLine()) != null) {
-                out.append(s).append("\n");
-            }
-        }
-        try {
-            if (p.waitFor() != 0) {
-                throw new IOException(
-                        String.format(
-                                "Failed to execute the command: %s\noutput: %s",
-                                String.join(" ", commands), out));
-            }
-        } catch (InterruptedException e) {
-            // Ignored. The subprocess is dead after "br.readLine()" returns null, so the call of
-            // "waitFor" should return intermediately.
-        }
-        return out.toString();
-    }
 }