You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by hx...@apache.org on 2022/03/21 08:45:06 UTC
[flink] branch master updated: [FLINK-26727][python] Fix the implementation of sub-interpreter in Thread Mode
This is an automated email from the ASF dual-hosted git repository.
hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b643fee [FLINK-26727][python] Fix the implementation of sub-interpreter in Thread Mode
b643fee is described below
commit b643feedc88e660df7c333870454763f011d3f01
Author: huangxingbo <hx...@gmail.com>
AuthorDate: Fri Mar 18 18:12:40 2022 +0800
[FLINK-26727][python] Fix the implementation of sub-interpreter in Thread Mode
This closes #19150.
---
.../docs/dev/python/python_execution_mode.md | 57 ++++++--------
.../docs/dev/python/python_execution_mode.md | 57 ++++++--------
.../shortcodes/generated/python_configuration.html | 2 +-
.../pyflink/table/tests/test_dependency.py | 15 +---
flink-python/pyflink/table/tests/test_udf.py | 15 +---
.../org/apache/flink/python/PythonOptions.java | 9 +--
.../embedded/EmbeddedPythonEnvironmentManager.java | 16 +---
.../AbstractEmbeddedPythonFunctionOperator.java | 8 +-
.../EmbeddedPythonScalarFunctionOperator.java | 90 ++++------------------
9 files changed, 75 insertions(+), 194 deletions(-)
diff --git a/docs/content.zh/docs/dev/python/python_execution_mode.md b/docs/content.zh/docs/dev/python/python_execution_mode.md
index c34da24..4bb3eb1 100644
--- a/docs/content.zh/docs/dev/python/python_execution_mode.md
+++ b/docs/content.zh/docs/dev/python/python_execution_mode.md
@@ -31,33 +31,24 @@ defines how to execute your customized Python functions.
Prior to release-1.15, there is the only execution mode called `PROCESS` execution mode. The `PROCESS`
mode means that the Python user-defined functions will be executed in separate Python processes.
-In release-1.15, it has introduced another two execution modes called `MULTI-THREAD` execution mode and
-`SUB-INTERPRETER` execution mode. The `MULTI-THREAD` mode means that the Python user-defined functions
-will be executed in the same thread as Java Operator, but it will be affected by GIL performance.
-The `SUB-INTERPRETER` mode means that the Python user-defined functions will be executed in Python
-different sub-interpreters rather than different threads of one interpreter, which can largely overcome
-the effects of the GIL, but some CPython extensions libraries doesn't support it, such as numpy, tensorflow, etc.
+In release-1.15, it has introduced a new execution mode called `THREAD` execution mode. The `THREAD`
+mode means that the Python user-defined functions will be executed in the same process as Java Operator,
+It should be noted that multiple Python user-defined functions running in the same JVM are still affected by GIL.
-## When can/should I use MULTI-THREAD execution mode or SUB-INTERPRETER execution mode?
+## When can/should I use THREAD execution mode?
-The purpose of the introduction of `MULTI-THREAD` mode and `SUB-INTERPRETER` mode is to overcome the
-overhead of serialization/deserialization and network communication caused in `PROCESS` mode.
-So if performance is not your concern, or the computing logic of your customized Python functions is
-the performance bottleneck of the job, `PROCESS` mode will be the best choice as `PROCESS` mode provides
-the best isolation compared to `MULTI-THREAD` mode and `SUB-INTERPRETER` mode.
-
-Compared to `MULTI-THREAD` execution mode, `SUB-INTERPRETER` execution mode can largely overcome the
-effects of the GIL, so you can get better performance usually. However, `SUB-INTERPRETER` may fail in some CPython
-extensions libraries, such as numpy, tensorflow. In this case, you should use `PROCESS` mode or `MULTI-THREAD` mode.
+The purpose of the introduction of `THREAD` mode is to overcome the overhead of serialization/deserialization
+and network communication caused in `PROCESS` mode. So if performance is not your concern, or the computing
+logic of your customized Python functions is the performance bottleneck of the job, `PROCESS` mode will
+be the best choice as `PROCESS` mode provides the best isolation compared to `THREAD` mode.
## Configuring Python execution mode
The execution mode can be configured via the `python.execution-mode` setting.
-There are three possible values:
+There are two possible values:
- `PROCESS`: The Python user-defined functions will be executed in separate Python process. (default)
- - `MULTI-THREAD`: The Python user-defined functions will be executed in the same thread as Java Operator.
- - `SUB-INTERPRETER`: The Python user-defined functions will be executed in Python different sub-interpreters.
+ - `THREAD`: The Python user-defined functions will be executed in the same process as Java operator.
You could specify the Python execution mode using Python Table API as following:
@@ -65,27 +56,23 @@ You could specify the Python execution mode using Python Table API as following:
# Specify `PROCESS` mode
table_env.get_config().get_configuration().set_string("python.execution-mode", "process")
-# Specify `MULTI-THREAD` mode
-table_env.get_config().get_configuration().set_string("python.execution-mode", "multi-thread")
-
-# Specify `SUB-INTERPRETER` mode
-table_env.get_config().get_configuration().set_string("python.execution-mode", "sub-interpreter")
+# Specify `THREAD` mode
+table_env.get_config().get_configuration().set_string("python.execution-mode", "thread")
```
{{< hint info >}}
-Currently, it still doesn't support to execute Python UDFs in `MULTI-THREAD` and `SUB-INTERPRETER` execution mode
-in all places. It will fall back to `PROCESS` execution mode in these cases. So it may happen that you configure a job
-to execute in `MULTI-THREAD` or `SUB-INTERPRETER` execution modes, however, it's actually executed in `PROCESS` execution mode.
+Currently, it still doesn't support to execute Python UDFs in `THREAD` execution mode in all places.
+It will fall back to `PROCESS` execution mode in these cases. So it may happen that you configure a job
+to execute in `THREAD` execution mode, however, it's actually executed in `PROCESS` execution mode.
{{< /hint >}}
{{< hint info >}}
-`MULTI-THREAD` execution mode only supports Python 3.7+. `SUB-INTERPRETER` execution mode only supports Python 3.8+.
+`THREAD` execution mode is only supported in Python 3.7+.
{{< /hint >}}
## Execution Behavior
-This section provides an overview of the execution behavior of `MULTI-THREAD` and `SUB-INTERPRETER`
-execution mode and contrasts they with `PROCESS` execution mode. For more
-details, please refer to the FLIP that introduced this feature:
+This section provides an overview of the execution behavior of `THREAD` execution mode and contrasts
+they with `PROCESS` execution mode. For more details, please refer to the FLIP that introduced this feature:
[FLIP-206](https://cwiki.apache.org/confluence/display/FLINK/FLIP-206%3A+Support+PyFlink+Runtime+Execution+in+Thread+Mode).
#### PROCESS Execution Mode
@@ -95,10 +82,10 @@ The Java operator process communicates with the Python worker process using vari
{{< img src="/fig/pyflink_process_execution_mode.png" alt="Process Execution Mode" >}}
-#### MULTI-THREAD and SUB-INTERPRETER Execution Mode
+#### THREAD Execution Mode
-In `MULTI-THREAD` and `SUB-INTERPRETER` execution mode, the Python user-defined functions will be executed in
-the same process as Java operators. PyFlink takes use of third part library [PEMJA](https://github.com/alibaba/pemja) to
-embed Python in Java Application.
+In `THREAD` execution mode, the Python user-defined functions will be executed in the same process
+as Java operators. PyFlink takes use of third part library [PEMJA](https://github.com/alibaba/pemja)
+to embed Python in Java Application.
{{< img src="/fig/pyflink_embedded_execution_mode.png" alt="Embedded Execution Mode" >}}
diff --git a/docs/content/docs/dev/python/python_execution_mode.md b/docs/content/docs/dev/python/python_execution_mode.md
index 2c96958..4f5ecf2 100644
--- a/docs/content/docs/dev/python/python_execution_mode.md
+++ b/docs/content/docs/dev/python/python_execution_mode.md
@@ -31,33 +31,24 @@ defines how to execute your customized Python functions.
Prior to release-1.15, there is the only execution mode called `PROCESS` execution mode. The `PROCESS`
mode means that the Python user-defined functions will be executed in separate Python processes.
-In release-1.15, it has introduced another two execution modes called `MULTI-THREAD` execution mode and
-`SUB-INTERPRETER` execution mode. The `MULTI-THREAD` mode means that the Python user-defined functions
-will be executed in the same thread as Java Operator, but it will be affected by GIL performance.
-The `SUB-INTERPRETER` mode means that the Python user-defined functions will be executed in Python
-different sub-interpreters rather than different threads of one interpreter, which can largely overcome
-the effects of the GIL, but some CPython extensions libraries doesn't support it, such as numpy, tensorflow, etc.
+In release-1.15, it has introduced a new execution mode called `THREAD` execution mode. The `THREAD`
+mode means that the Python user-defined functions will be executed in the same process as Java Operator,
+It should be noted that multiple Python user-defined functions running in the same JVM are still affected by GIL.
-## When can/should I use MULTI-THREAD execution mode or SUB-INTERPRETER execution mode?
+## When can/should I use THREAD execution mode?
-The purpose of the introduction of `MULTI-THREAD` mode and `SUB-INTERPRETER` mode is to overcome the
-overhead of serialization/deserialization and network communication caused in `PROCESS` mode.
-So if performance is not your concern, or the computing logic of your customized Python functions is
-the performance bottleneck of the job, `PROCESS` mode will be the best choice as `PROCESS` mode provides
-the best isolation compared to `MULTI-THREAD` mode and `SUB-INTERPRETER` mode.
-
-Compared to `MULTI-THREAD` execution mode, `SUB-INTERPRETER` execution mode can largely overcome the
-effects of the GIL, so you can get better performance usually. However, `SUB-INTERPRETER` may fail in some CPython
-extensions libraries, such as numpy, tensorflow. In this case, you should use `PROCESS` mode or `MULTI-THREAD` mode.
+The purpose of the introduction of `THREAD` mode is to overcome the overhead of serialization/deserialization
+and network communication caused in `PROCESS` mode. So if performance is not your concern, or the computing
+logic of your customized Python functions is the performance bottleneck of the job, `PROCESS` mode will
+be the best choice as `PROCESS` mode provides the best isolation compared to `THREAD` mode.
## Configuring Python execution mode
The execution mode can be configured via the `python.execution-mode` setting.
-There are three possible values:
+There are two possible values:
- `PROCESS`: The Python user-defined functions will be executed in separate Python process. (default)
- - `MULTI-THREAD`: The Python user-defined functions will be executed in the same thread as Java Operator.
- - `SUB-INTERPRETER`: The Python user-defined functions will be executed in Python different sub-interpreters.
+ - `THREAD`: The Python user-defined functions will be executed in the same process as Java operator.
You could specify the Python execution mode using Python Table API as following:
@@ -65,27 +56,23 @@ You could specify the Python execution mode using Python Table API as following:
# Specify `PROCESS` mode
table_env.get_config().get_configuration().set_string("python.execution-mode", "process")
-# Specify `MULTI-THREAD` mode
-table_env.get_config().get_configuration().set_string("python.execution-mode", "multi-thread")
-
-# Specify `SUB-INTERPRETER` mode
-table_env.get_config().get_configuration().set_string("python.execution-mode", "sub-interpreter")
+# Specify `THREAD` mode
+table_env.get_config().get_configuration().set_string("python.execution-mode", "thread")
```
{{< hint info >}}
-Currently, it still doesn't support to execute Python UDFs in `MULTI-THREAD` and `SUB-INTERPRETER` execution mode
-in all places. It will fall back to `PROCESS` execution mode in these cases. So it may happen that you configure a job
-to execute in `MULTI-THREAD` or `SUB-INTERPRETER` execution modes, however, it's actually executed in `PROCESS` execution mode.
+Currently, it still doesn't support to execute Python UDFs in `THREAD` execution mode in all places.
+It will fall back to `PROCESS` execution mode in these cases. So it may happen that you configure a job
+to execute in `THREAD` execution mode, however, it's actually executed in `PROCESS` execution mode.
{{< /hint >}}
{{< hint info >}}
-`MULTI-THREAD` execution mode only supports Python 3.7+. `SUB-INTERPRETER` execution mode only supports Python 3.8+.
+`THREAD` execution mode is only supported in Python 3.7+.
{{< /hint >}}
## Execution Behavior
-This section provides an overview of the execution behavior of `MULTI-THREAD` and `SUB-INTERPRETER`
-execution mode and contrasts they with `PROCESS` execution mode. For more
-details, please refer to the FLIP that introduced this feature:
+This section provides an overview of the execution behavior of `THREAD` execution mode and contrasts
+they with `PROCESS` execution mode. For more details, please refer to the FLIP that introduced this feature:
[FLIP-206](https://cwiki.apache.org/confluence/display/FLINK/FLIP-206%3A+Support+PyFlink+Runtime+Execution+in+Thread+Mode).
#### PROCESS Execution Mode
@@ -95,10 +82,10 @@ The Java operator process communicates with the Python worker process using vari
{{< img src="/fig/pyflink_process_execution_mode.png" alt="Process Execution Mode" >}}
-#### MULTI-THREAD and SUB-INTERPRETER Execution Mode
+#### THREAD Execution Mode
-In `MULTI-THREAD` and `SUB-INTERPRETER` execution mode, the Python user-defined functions will be executed in
-the same process as Java operators. PyFlink takes use of third part library [PEMJA](https://github.com/alibaba/pemja) to
-embed Python in Java Application.
+In `THREAD` execution mode, the Python user-defined functions will be executed in the same process
+as Java operators. PyFlink takes use of third part library [PEMJA](https://github.com/alibaba/pemja)
+to embed Python in Java Application.
{{< img src="/fig/pyflink_embedded_execution_mode.png" alt="Embedded Execution Mode" >}}
diff --git a/docs/layouts/shortcodes/generated/python_configuration.html b/docs/layouts/shortcodes/generated/python_configuration.html
index 9ad42d8..cdaabcf 100644
--- a/docs/layouts/shortcodes/generated/python_configuration.html
+++ b/docs/layouts/shortcodes/generated/python_configuration.html
@@ -30,7 +30,7 @@
<td><h5>python.execution-mode</h5></td>
<td style="word-wrap: break-word;">"process"</td>
<td>String</td>
- <td>Specify the python runtime execution mode. The optional values are `process`, `multi-thread` and `sub-interpreter`. The `process` mode means that the Python user-defined functions will be executed in separate Python process. The `multi-thread` mode means that the Python user-defined functions will be executed in the same thread as Java Operator, but it will be affected by GIL performance. The `sub-interpreter` mode means that the Python user-defined functions will be exec [...]
+ <td>Specify the python runtime execution mode. The optional values are `process` and `thread`. The `process` mode means that the Python user-defined functions will be executed in separate Python process. The `thread` mode means that the Python user-defined functions will be executed in the same process of the Java operator. Note that currently it still doesn't support to execute Python user-defined functions in `thread` mode in all places. It will fall back to `process` mode [...]
</tr>
<tr>
<td><h5>python.files</h5></td>
diff --git a/flink-python/pyflink/table/tests/test_dependency.py b/flink-python/pyflink/table/tests/test_dependency.py
index b180c72..5c4ae84 100644
--- a/flink-python/pyflink/table/tests/test_dependency.py
+++ b/flink-python/pyflink/table/tests/test_dependency.py
@@ -93,19 +93,10 @@ class DependencyTests(object):
@pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7")
-class EmbeddedMultiThreadDependencyTests(DependencyTests, PyFlinkStreamTableTestCase):
+class EmbeddedThreadDependencyTests(DependencyTests, PyFlinkStreamTableTestCase):
def setUp(self):
- super(EmbeddedMultiThreadDependencyTests, self).setUp()
- self.t_env.get_config().get_configuration().set_string("python.execution-mode",
- "multi-thread")
-
-
-@pytest.mark.skipif(sys.version_info < (3, 8), reason="requires python3.8")
-class EmbeddedSubInterpreterDependencyTests(DependencyTests, PyFlinkStreamTableTestCase):
- def setUp(self):
- super(EmbeddedSubInterpreterDependencyTests, self).setUp()
- self.t_env.get_config().get_configuration().set_string("python.execution-mode",
- "sub-interpreter")
+ super(EmbeddedThreadDependencyTests, self).setUp()
+ self.t_env.get_config().get_configuration().set_string("python.execution-mode", "thread")
class BatchDependencyTests(DependencyTests, PyFlinkBatchTableTestCase):
diff --git a/flink-python/pyflink/table/tests/test_udf.py b/flink-python/pyflink/table/tests/test_udf.py
index 6abdee2..3e7562a 100644
--- a/flink-python/pyflink/table/tests/test_udf.py
+++ b/flink-python/pyflink/table/tests/test_udf.py
@@ -797,19 +797,10 @@ class PyFlinkBatchUserDefinedFunctionTests(UserDefinedFunctionTests,
@pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7")
-class PyFlinkEmbeddedMultiThreadTests(UserDefinedFunctionTests, PyFlinkBatchTableTestCase):
+class PyFlinkEmbeddedThreadTests(UserDefinedFunctionTests, PyFlinkBatchTableTestCase):
def setUp(self):
- super(PyFlinkEmbeddedMultiThreadTests, self).setUp()
- self.t_env.get_config().get_configuration().set_string("python.execution-mode",
- "multi-thread")
-
-
-@pytest.mark.skipif(sys.version_info < (3, 8), reason="requires python3.8")
-class PyFlinkEmbeddedSubInterpreterTests(UserDefinedFunctionTests, PyFlinkBatchTableTestCase):
- def setUp(self):
- super(PyFlinkEmbeddedSubInterpreterTests, self).setUp()
- self.t_env.get_config().get_configuration().set_string("python.execution-mode",
- "sub-interpreter")
+ super(PyFlinkEmbeddedThreadTests, self).setUp()
+ self.t_env.get_config().get_configuration().set_string("python.execution-mode", "thread")
# test specify the input_types
diff --git a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
index 649e2e6..78cd3b5 100644
--- a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
+++ b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
@@ -231,10 +231,9 @@ public class PythonOptions {
.stringType()
.defaultValue("process")
.withDescription(
- "Specify the python runtime execution mode. The optional values are `process`, `multi-thread` and `sub-interpreter`. "
+ "Specify the python runtime execution mode. The optional values are `process` and `thread`. "
+ "The `process` mode means that the Python user-defined functions will be executed in separate Python process. "
- + "The `multi-thread` mode means that the Python user-defined functions will be executed in the same thread as Java Operator, but it will be affected by GIL performance. "
- + "The `sub-interpreter` mode means that the Python user-defined functions will be executed in python different sub-interpreters rather than different threads of one interpreter, "
- + "which can largely overcome the effects of the GIL, but it maybe fail in some CPython extensions libraries, such as numpy, tensorflow. "
- + "Note that if the python operator dose not support `multi-thread` and `sub-interpreter` mode, we will still use `process` mode.");
+ + "The `thread` mode means that the Python user-defined functions will be executed in the same process of the Java operator. "
+ + "Note that currently it still doesn't support to execute Python user-defined functions in `thread` mode in all places. "
+ + "It will fall back to `process` mode in these cases.");
}
diff --git a/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironmentManager.java b/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironmentManager.java
index 1d7e9a0..750284e 100644
--- a/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironmentManager.java
+++ b/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironmentManager.java
@@ -54,9 +54,7 @@ public class EmbeddedPythonEnvironmentManager extends AbstractPythonEnvironmentM
String executionMode = dependencyInfo.getExecutionMode();
- if (executionMode.equalsIgnoreCase("sub-interpreter")) {
- execType = PythonInterpreterConfig.ExecType.SUB_INTERPRETER;
- } else if (executionMode.equalsIgnoreCase("multi-thread")) {
+ if (executionMode.equalsIgnoreCase("thread")) {
execType = PythonInterpreterConfig.ExecType.MULTI_THREAD;
} else {
throw new RuntimeException(
@@ -66,16 +64,8 @@ public class EmbeddedPythonEnvironmentManager extends AbstractPythonEnvironmentM
String pythonVersion =
PythonEnvironmentManagerUtils.getPythonVersion(dependencyInfo.getPythonExec());
- if (execType == PythonInterpreterConfig.ExecType.SUB_INTERPRETER) {
- if (pythonVersion.compareTo("3.8") < 0) {
- throw new RuntimeException(
- "`SUB-INTERPRETER` execution mode only supports Python 3.8+");
- }
- } else {
- if (pythonVersion.compareTo("3.7") < 0) {
- throw new RuntimeException(
- "`MULTI-THREAD` execution mode only supports Python 3.7+");
- }
+ if (pythonVersion.compareTo("3.7") < 0) {
+ throw new RuntimeException("`THREAD` execution mode only supports Python 3.7+");
}
if (env.containsKey("FLINK_TESTING")) {
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java
index f23e3d2..be709e6 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java
@@ -105,7 +105,7 @@ public abstract class AbstractEmbeddedPythonFunctionOperator<OUT>
}
}
- openPythonInterpreter(pythonConfig.getPythonExec(), env, interpreterConfig.getExecType());
+ openPythonInterpreter(pythonConfig.getPythonExec(), env);
}
@Override
@@ -152,11 +152,7 @@ public abstract class AbstractEmbeddedPythonFunctionOperator<OUT>
}
/** Setup method for Python Interpreter. It can be used for initialization work. */
- public abstract void openPythonInterpreter(
- String pythonExecutable,
- Map<String, String> env,
- PythonInterpreterConfig.ExecType execType)
- throws Exception;
+ public abstract void openPythonInterpreter(String pythonExecutable, Map<String, String> env);
/** Returns the {@link PythonEnv} used to create PythonEnvironmentManager. */
public abstract PythonEnv getPythonEnv();
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java
index 848b678..3c843cd 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java
@@ -40,13 +40,6 @@ import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
-import pemja.core.PythonInterpreterConfig;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;
@@ -156,46 +149,21 @@ public class EmbeddedPythonScalarFunctionOperator
}
@Override
- public void openPythonInterpreter(
- String pythonExecutable,
- Map<String, String> env,
- PythonInterpreterConfig.ExecType execType)
- throws Exception {
- if (execType.equals(PythonInterpreterConfig.ExecType.SUB_INTERPRETER)) {
- LOG.info("Create Operation in sub-interpreters.");
- String[] commands =
- new String[] {
- pythonExecutable,
- "-c",
- String.format(
- "from pyflink.fn_execution.utils.operation_utils import create_serialized_scalar_operation_from_proto;"
- + "print(create_serialized_scalar_operation_from_proto(%s, %s, %s))",
- Arrays.toString(getUserDefinedFunctionsProto().toByteArray()),
- isOneArg ? "True" : "False",
- isOneFieldResult ? "True" : "False")
- };
- interpreter.exec(
- "from pyflink.fn_execution.utils.operation_utils import deserialized_operation_from_serialized_bytes");
- interpreter.exec(
- String.format(
- "scalar_operation = deserialized_operation_from_serialized_bytes(%s)",
- executeScript(commands, env)));
- } else {
- LOG.info("Create Operation in multi-threads.");
-
- // The CPython extension included in proto does not support initialization
- // multiple times, so we choose the only interpreter process to be responsible for
- // initialization and proto parsing. The only interpreter parses the proto and
- // serializes function operations with cloudpickle.
- interpreter.exec(
- "from pyflink.fn_execution.utils.operation_utils import create_scalar_operation_from_proto");
- interpreter.set("proto", getUserDefinedFunctionsProto().toByteArray());
-
- interpreter.exec(
- String.format(
- "scalar_operation = create_scalar_operation_from_proto(proto, %s, %s)",
- isOneArg ? "True" : "False", isOneFieldResult ? "True" : "False"));
- }
+ public void openPythonInterpreter(String pythonExecutable, Map<String, String> env) {
+ LOG.info("Create Operation in multi-threads.");
+
+ // The CPython extension included in proto does not support initialization
+ // multiple times, so we choose the only interpreter process to be responsible for
+ // initialization and proto parsing. The only interpreter parses the proto and
+ // serializes function operations with cloudpickle.
+ interpreter.exec(
+ "from pyflink.fn_execution.utils.operation_utils import create_scalar_operation_from_proto");
+ interpreter.set("proto", getUserDefinedFunctionsProto().toByteArray());
+
+ interpreter.exec(
+ String.format(
+ "scalar_operation = create_scalar_operation_from_proto(proto, %s, %s)",
+ isOneArg ? "True" : "False", isOneFieldResult ? "True" : "False"));
// invoke `open` method of ScalarOperation.
interpreter.invokeMethod("scalar_operation", "open");
@@ -272,32 +240,4 @@ public class EmbeddedPythonScalarFunctionOperator
builder.setProfileEnabled(pythonConfig.isProfileEnabled());
return builder.build();
}
-
- private String executeScript(final String[] commands, Map<String, String> env)
- throws IOException {
- ProcessBuilder pb = new ProcessBuilder(commands);
- pb.environment().putAll(env);
- pb.redirectErrorStream(true);
- Process p = pb.start();
- InputStream in = new BufferedInputStream(p.getInputStream());
- StringBuilder out = new StringBuilder();
- String s;
- try (BufferedReader br = new BufferedReader(new InputStreamReader(in))) {
- while ((s = br.readLine()) != null) {
- out.append(s).append("\n");
- }
- }
- try {
- if (p.waitFor() != 0) {
- throw new IOException(
- String.format(
- "Failed to execute the command: %s\noutput: %s",
- String.join(" ", commands), out));
- }
- } catch (InterruptedException e) {
- // Ignored. The subprocess is dead after "br.readLine()" returns null, so the call of
- // "waitFor" should return intermediately.
- }
- return out.toString();
- }
}