You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by hx...@apache.org on 2022/01/27 02:16:47 UTC
[flink] branch master updated: [FLINK-25719][python] Support General Python UDF in Thread Mode
This is an automated email from the ASF dual-hosted git repository.
hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 51eb386 [FLINK-25719][python] Support General Python UDF in Thread Mode
51eb386 is described below
commit 51eb386048484502f88d8806b75c17b04b91a613
Author: huangxingbo <hx...@gmail.com>
AuthorDate: Thu Jan 20 17:47:20 2022 +0800
[FLINK-25719][python] Support General Python UDF in Thread Mode
This closes #18418.
---
.../shortcodes/generated/python_configuration.html | 6 +
flink-python/dev/dev-requirements.txt | 1 +
flink-python/pom.xml | 8 +
.../pyflink/fn_execution/beam/beam_operations.py | 17 +-
.../fn_execution/beam/beam_operations_fast.pyx | 4 +-
.../fn_execution/beam/beam_operations_slow.py | 4 +-
.../pyflink/fn_execution/coder_impl_fast.pyx | 5 +-
.../pyflink/fn_execution/coder_impl_slow.py | 5 +-
flink-python/pyflink/fn_execution/coders.py | 90 +++---
.../pyflink/fn_execution/datastream/operations.py | 33 ++-
.../fn_execution/datastream/runtime_context.py | 5 +-
.../fn_execution/datastream/timerservice_impl.py | 16 +-
.../datastream/window/window_operator.py | 3 +-
.../pyflink/fn_execution/table/aggregate_fast.pyx | 7 +-
.../pyflink/fn_execution/table/aggregate_slow.py | 7 +-
.../pyflink/fn_execution/table/operations.py | 84 +++---
.../pyflink/fn_execution/table/state_data_view.py | 7 +-
.../fn_execution/table/window_aggregate_fast.pyx | 5 +-
.../fn_execution/table/window_aggregate_slow.py | 5 +-
.../pyflink/fn_execution/table/window_context.py | 9 +-
.../fn_execution/table/window_process_function.py | 4 +-
.../fn_execution/tests/test_process_mode_boot.py | 2 +-
.../pyflink/fn_execution/utils/operation_utils.py | 51 +++-
.../pyflink/table/tests/test_dependency.py | 70 +++--
flink-python/pyflink/table/tests/test_udf.py | 51 +++-
flink-python/setup.py | 1 +
.../java/org/apache/flink/python/PythonConfig.java | 8 +
.../org/apache/flink/python/PythonOptions.java | 13 +
....java => AbstractPythonEnvironmentManager.java} | 317 +++++++++------------
.../flink/python/env/PythonDependencyInfo.java | 32 ++-
.../flink/python/env/PythonEnvironmentManager.java | 20 +-
.../EmbeddedPythonEnvironment.java} | 23 +-
.../embedded/EmbeddedPythonEnvironmentManager.java | 84 ++++++
.../{ => process}/ProcessPythonEnvironment.java | 3 +-
.../process/ProcessPythonEnvironmentManager.java | 100 +++++++
.../python/util/PythonEnvironmentManagerUtils.java | 2 +-
.../AbstractDataStreamPythonFunctionOperator.java | 2 +-
.../AbstractEmbeddedPythonFunctionOperator.java | 166 +++++++++++
.../AbstractExternalPythonFunctionOperator.java | 154 ++++++++++
.../python/AbstractPythonFunctionOperator.java | 110 +------
.../beam/BeamDataStreamPythonFunctionRunner.java | 4 +-
.../python/beam/BeamPythonFunctionRunner.java | 10 +-
.../AbstractOneInputPythonFunctionOperator.java | 4 +-
.../EmbeddedPythonScalarFunctionOperator.java | 303 ++++++++++++++++++++
.../python/beam/BeamTablePythonFunctionRunner.java | 8 +-
.../table/runtime/typeutils/PythonTypeUtils.java | 272 ++++++++++++++++++
flink-python/src/main/resources/META-INF/NOTICE | 1 +
.../ProcessPythonEnvironmentManagerTest.java | 18 +-
...ghPythonStreamGroupWindowAggregateOperator.java | 2 +-
.../PythonStreamGroupAggregateOperatorTest.java | 2 +-
...ythonStreamGroupTableAggregateOperatorTest.java | 2 +-
...owPythonGroupAggregateFunctionOperatorTest.java | 2 +-
...onGroupWindowAggregateFunctionOperatorTest.java | 2 +-
...honOverWindowAggregateFunctionOperatorTest.java | 2 +-
...onGroupWindowAggregateFunctionOperatorTest.java | 2 +-
...rrowPythonProcTimeBoundedRangeOperatorTest.java | 2 +-
...ArrowPythonProcTimeBoundedRowsOperatorTest.java | 2 +-
...ArrowPythonRowTimeBoundedRangeOperatorTest.java | 2 +-
...mArrowPythonRowTimeBoundedRowsOperatorTest.java | 2 +-
.../scalar/PythonScalarFunctionOperatorTest.java | 2 +-
.../ArrowPythonScalarFunctionOperatorTest.java | 2 +-
.../table/PythonTableFunctionOperatorTest.java | 2 +-
.../runtime/typeutils/PythonTypeUtilsTest.java | 13 +
.../PassThroughPythonAggregateFunctionRunner.java | 4 +-
.../PassThroughPythonScalarFunctionRunner.java | 4 +-
.../PassThroughPythonTableFunctionRunner.java | 4 +-
...ThroughStreamAggregatePythonFunctionRunner.java | 4 +-
...amGroupWindowAggregatePythonFunctionRunner.java | 4 +-
...ghStreamTableAggregatePythonFunctionRunner.java | 4 +-
.../flink/table/runtime/utils/PythonTestUtils.java | 5 +-
.../nodes/exec/common/CommonExecPythonCalc.java | 106 +++++--
.../plan/nodes/exec/utils/CommonPythonUtil.java | 14 +
72 files changed, 1762 insertions(+), 582 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/python_configuration.html b/docs/layouts/shortcodes/generated/python_configuration.html
index b9d0559..9ad42d8 100644
--- a/docs/layouts/shortcodes/generated/python_configuration.html
+++ b/docs/layouts/shortcodes/generated/python_configuration.html
@@ -27,6 +27,12 @@
<td>Specify the path of the python interpreter used to execute the python UDF worker. The python UDF worker depends on Python 3.6+, Apache Beam (version == 2.27.0), Pip (version >= 7.1.0) and SetupTools (version >= 37.0.0). Please ensure that the specified environment meets the above requirements. The option is equivalent to the command line option "-pyexec".</td>
</tr>
<tr>
+ <td><h5>python.execution-mode</h5></td>
+ <td style="word-wrap: break-word;">"process"</td>
+ <td>String</td>
+ <td>Specify the python runtime execution mode. The optional values are `process`, `multi-thread` and `sub-interpreter`. The `process` mode means that the Python user-defined functions will be executed in separate Python process. The `multi-thread` mode means that the Python user-defined functions will be executed in the same thread as Java Operator, but it will be affected by GIL performance. The `sub-interpreter` mode means that the Python user-defined functions will be exec [...]
+ </tr>
+ <tr>
<td><h5>python.files</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
diff --git a/flink-python/dev/dev-requirements.txt b/flink-python/dev/dev-requirements.txt
index c6b0ef30..bd75137 100755
--- a/flink-python/dev/dev-requirements.txt
+++ b/flink-python/dev/dev-requirements.txt
@@ -27,3 +27,4 @@ numpy>=1.14.3,<1.20
fastavro>=0.21.4,<0.24
grpcio>=1.29.0,<2
grpcio-tools>=1.3.5,<=1.14.2
+pemja==0.1.2; python_version >= '3.7'
diff --git a/flink-python/pom.xml b/flink-python/pom.xml
index b14e5d1..304a590 100644
--- a/flink-python/pom.xml
+++ b/flink-python/pom.xml
@@ -99,6 +99,13 @@ under the License.
<artifactId>beam-runners-core-java</artifactId>
</dependency>
+ <!-- PemJa dependencies -->
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>pemja</artifactId>
+ <version>0.1.2</version>
+ </dependency>
+
<!-- Protobuf dependencies -->
<dependency>
@@ -375,6 +382,7 @@ under the License.
<include>org.apache.arrow:*</include>
<include>io.netty:*</include>
<include>com.google.flatbuffers:*</include>
+ <include>com.alibaba:pemja</include>
</includes>
</artifactSet>
<filters>
diff --git a/flink-python/pyflink/fn_execution/beam/beam_operations.py b/flink-python/pyflink/fn_execution/beam/beam_operations.py
index 04fc5d4..17074b2 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_operations.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_operations.py
@@ -151,12 +151,13 @@ def _create_user_defined_function_operation(factory, transform_proto, consumers,
side_inputs=None,
output_coders=[output_coders[tag] for tag in output_tags])
- if hasattr(spec.serialized_fn, "key_type"):
+ serialized_fn = spec.serialized_fn
+ if hasattr(serialized_fn, "key_type"):
# keyed operation, need to create the KeyedStateBackend.
- row_schema = spec.serialized_fn.key_type.row_schema
+ row_schema = serialized_fn.key_type.row_schema
key_row_coder = FlattenRowCoder([from_proto(f.type) for f in row_schema.fields])
- if spec.serialized_fn.HasField('group_window'):
- if spec.serialized_fn.group_window.is_time_window:
+ if serialized_fn.HasField('group_window'):
+ if serialized_fn.group_window.is_time_window:
window_coder = TimeWindowCoder()
else:
window_coder = CountWindowCoder()
@@ -166,9 +167,9 @@ def _create_user_defined_function_operation(factory, transform_proto, consumers,
factory.state_handler,
key_row_coder,
window_coder,
- spec.serialized_fn.state_cache_size,
- spec.serialized_fn.map_state_read_cache_size,
- spec.serialized_fn.map_state_write_cache_size)
+ serialized_fn.state_cache_size,
+ serialized_fn.map_state_read_cache_size,
+ serialized_fn.map_state_write_cache_size)
return beam_operation_cls(
transform_proto.unique_name,
@@ -179,7 +180,7 @@ def _create_user_defined_function_operation(factory, transform_proto, consumers,
internal_operation_cls,
keyed_state_backend)
elif internal_operation_cls == datastream_operations.StatefulOperation:
- key_row_coder = from_type_info_proto(spec.serialized_fn.key_type_info)
+ key_row_coder = from_type_info_proto(serialized_fn.key_type_info)
keyed_state_backend = RemoteKeyedStateBackend(
factory.state_handler,
key_row_coder,
diff --git a/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx b/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
index c7e5a09..a9ace9d 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
+++ b/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
@@ -199,7 +199,7 @@ cdef class StatelessFunctionOperation(FunctionOperation):
name, spec, counter_factory, sampler, consumers, operation_cls)
cdef object generate_operation(self):
- return self.operation_cls(self.spec)
+ return self.operation_cls(self.spec.serialized_fn)
cdef class StatefulFunctionOperation(FunctionOperation):
@@ -211,7 +211,7 @@ cdef class StatefulFunctionOperation(FunctionOperation):
name, spec, counter_factory, sampler, consumers, operation_cls)
cdef object generate_operation(self):
- return self.operation_cls(self.spec, self._keyed_state_backend)
+ return self.operation_cls(self.spec.serialized_fn, self._keyed_state_backend)
cpdef void add_timer_info(self, timer_family_id, timer_info):
# ignore timer_family_id
diff --git a/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py b/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
index 809a4b0..57a375a 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
@@ -149,7 +149,7 @@ class StatelessFunctionOperation(FunctionOperation):
name, spec, counter_factory, sampler, consumers, operation_cls)
def generate_operation(self):
- return self.operation_cls(self.spec)
+ return self.operation_cls(self.spec.serialized_fn)
class StatefulFunctionOperation(FunctionOperation):
@@ -161,7 +161,7 @@ class StatefulFunctionOperation(FunctionOperation):
name, spec, counter_factory, sampler, consumers, operation_cls)
def generate_operation(self):
- return self.operation_cls(self.spec, self._keyed_state_backend)
+ return self.operation_cls(self.spec.serialized_fn, self._keyed_state_backend)
def add_timer_info(self, timer_family_id: str, timer_info: TimerInfo):
# ignore timer_family_id
diff --git a/flink-python/pyflink/fn_execution/coder_impl_fast.pyx b/flink-python/pyflink/fn_execution/coder_impl_fast.pyx
index c99ff5c..75c7706 100644
--- a/flink-python/pyflink/fn_execution/coder_impl_fast.pyx
+++ b/flink-python/pyflink/fn_execution/coder_impl_fast.pyx
@@ -28,7 +28,6 @@ import pickle
from typing import List, Union
import cloudpickle
-import pyarrow as pa
from pyflink.common import Row, RowKind
from pyflink.common.time import Instant
@@ -437,6 +436,8 @@ cdef class ArrowCoderImpl(FieldCoderImpl):
self._batch_reader = self._load_from_stream(self._resettable_io)
cpdef encode_to_stream(self, cols, OutputStream out_stream):
+ import pyarrow as pa
+
self._resettable_io.set_output_stream(out_stream)
batch_writer = pa.RecordBatchStreamWriter(self._resettable_io, self._schema)
batch_writer.write_batch(
@@ -451,6 +452,8 @@ cdef class ArrowCoderImpl(FieldCoderImpl):
return arrow_to_pandas(self._timezone, self._field_types, [next(self._batch_reader)])
def _load_from_stream(self, stream):
+ import pyarrow as pa
+
while stream.readable():
reader = pa.ipc.open_stream(stream)
yield reader.read_next_batch()
diff --git a/flink-python/pyflink/fn_execution/coder_impl_slow.py b/flink-python/pyflink/fn_execution/coder_impl_slow.py
index 77aff11..dc3c186 100644
--- a/flink-python/pyflink/fn_execution/coder_impl_slow.py
+++ b/flink-python/pyflink/fn_execution/coder_impl_slow.py
@@ -22,7 +22,6 @@ from abc import ABC, abstractmethod
from typing import List
import cloudpickle
-import pyarrow as pa
from pyflink.common import Row, RowKind
from pyflink.common.time import Instant
@@ -281,6 +280,8 @@ class ArrowCoderImpl(FieldCoderImpl):
self._batch_reader = ArrowCoderImpl._load_from_stream(self._resettable_io)
def encode_to_stream(self, cols, out_stream: OutputStream):
+ import pyarrow as pa
+
self._resettable_io.set_output_stream(out_stream)
batch_writer = pa.RecordBatchStreamWriter(self._resettable_io, self._schema)
batch_writer.write_batch(
@@ -296,6 +297,8 @@ class ArrowCoderImpl(FieldCoderImpl):
@staticmethod
def _load_from_stream(stream):
+ import pyarrow as pa
+
while stream.readable():
reader = pa.ipc.open_stream(stream)
yield reader.read_next_batch()
diff --git a/flink-python/pyflink/fn_execution/coders.py b/flink-python/pyflink/fn_execution/coders.py
index 8ef5607..2ddd2f3 100644
--- a/flink-python/pyflink/fn_execution/coders.py
+++ b/flink-python/pyflink/fn_execution/coders.py
@@ -19,14 +19,12 @@
import os
from abc import ABC, abstractmethod
-import pyarrow as pa
import pytz
from pyflink.common.typeinfo import TypeInformation, BasicTypeInfo, BasicType, DateTypeInfo, \
TimeTypeInfo, TimestampTypeInfo, PrimitiveArrayTypeInfo, BasicArrayTypeInfo, TupleTypeInfo, \
MapTypeInfo, ListTypeInfo, RowTypeInfo, PickledBytesTypeInfo, ObjectArrayTypeInfo, \
ExternalTypeInfo
-from pyflink.fn_execution import flink_fn_execution_pb2
from pyflink.table.types import TinyIntType, SmallIntType, IntType, BigIntType, BooleanType, \
FloatType, DoubleType, VarCharType, VarBinaryType, DecimalType, DateType, TimeType, \
LocalZonedTimestampType, RowType, RowField, to_arrow_type, TimestampType, ArrayType
@@ -59,6 +57,8 @@ class LengthPrefixBaseCoder(ABC):
@classmethod
def from_coder_info_descriptor_proto(cls, coder_info_descriptor_proto):
+ from pyflink.fn_execution import flink_fn_execution_pb2
+
field_coder = cls._to_field_coder(coder_info_descriptor_proto)
mode = coder_info_descriptor_proto.mode
separated_with_end_message = coder_info_descriptor_proto.separated_with_end_message
@@ -98,11 +98,15 @@ class LengthPrefixBaseCoder(ABC):
@classmethod
def _to_arrow_schema(cls, row_type):
+ import pyarrow as pa
+
return pa.schema([pa.field(n, to_arrow_type(t), t._nullable)
for n, t in zip(row_type.field_names(), row_type.field_types())])
@classmethod
def _to_data_type(cls, field_type):
+ from pyflink.fn_execution import flink_fn_execution_pb2
+
if field_type.type_name == flink_fn_execution_pb2.Schema.TINYINT:
return TinyIntType(field_type.nullable)
elif field_type.type_name == flink_fn_execution_pb2.Schema.SMALLINT:
@@ -593,24 +597,6 @@ class DataViewFilterCoder(FieldCoder):
return coder_impl.DataViewFilterCoderImpl(self._udf_data_view_specs)
-type_name = flink_fn_execution_pb2.Schema
-_type_name_mappings = {
- type_name.TINYINT: TinyIntCoder(),
- type_name.SMALLINT: SmallIntCoder(),
- type_name.INT: IntCoder(),
- type_name.BIGINT: BigIntCoder(),
- type_name.BOOLEAN: BooleanCoder(),
- type_name.FLOAT: FloatCoder(),
- type_name.DOUBLE: DoubleCoder(),
- type_name.BINARY: BinaryCoder(),
- type_name.VARBINARY: BinaryCoder(),
- type_name.CHAR: CharCoder(),
- type_name.VARCHAR: CharCoder(),
- type_name.DATE: DateCoder(),
- type_name.TIME: TimeCoder(),
-}
-
-
def from_proto(field_type):
"""
Creates the corresponding :class:`Coder` given the protocol representation of the field type.
@@ -618,6 +604,25 @@ def from_proto(field_type):
:param field_type: the protocol representation of the field type
:return: :class:`Coder`
"""
+ from pyflink.fn_execution import flink_fn_execution_pb2
+
+ type_name = flink_fn_execution_pb2.Schema
+ _type_name_mappings = {
+ type_name.TINYINT: TinyIntCoder(),
+ type_name.SMALLINT: SmallIntCoder(),
+ type_name.INT: IntCoder(),
+ type_name.BIGINT: BigIntCoder(),
+ type_name.BOOLEAN: BooleanCoder(),
+ type_name.FLOAT: FloatCoder(),
+ type_name.DOUBLE: DoubleCoder(),
+ type_name.BINARY: BinaryCoder(),
+ type_name.VARBINARY: BinaryCoder(),
+ type_name.CHAR: CharCoder(),
+ type_name.VARCHAR: CharCoder(),
+ type_name.DATE: DateCoder(),
+ type_name.TIME: TimeCoder(),
+ }
+
field_type_name = field_type.type_name
coder = _type_name_mappings.get(field_type_name)
if coder is not None:
@@ -642,29 +647,30 @@ def from_proto(field_type):
raise ValueError("field_type %s is not supported." % field_type)
-# for data stream type information.
-type_info_name = flink_fn_execution_pb2.TypeInfo
-_type_info_name_mappings = {
- type_info_name.STRING: CharCoder(),
- type_info_name.BYTE: TinyIntCoder(),
- type_info_name.BOOLEAN: BooleanCoder(),
- type_info_name.SHORT: SmallIntCoder(),
- type_info_name.INT: IntCoder(),
- type_info_name.LONG: BigIntCoder(),
- type_info_name.FLOAT: FloatCoder(),
- type_info_name.DOUBLE: DoubleCoder(),
- type_info_name.CHAR: CharCoder(),
- type_info_name.BIG_INT: BigIntCoder(),
- type_info_name.BIG_DEC: BigDecimalCoder(),
- type_info_name.SQL_DATE: DateCoder(),
- type_info_name.SQL_TIME: TimeCoder(),
- type_info_name.SQL_TIMESTAMP: TimestampCoder(3),
- type_info_name.PICKLED_BYTES: CloudPickleCoder(),
- type_info_name.INSTANT: InstantCoder()
-}
-
-
def from_type_info_proto(type_info):
+ # for data stream type information.
+ from pyflink.fn_execution import flink_fn_execution_pb2
+
+ type_info_name = flink_fn_execution_pb2.TypeInfo
+ _type_info_name_mappings = {
+ type_info_name.STRING: CharCoder(),
+ type_info_name.BYTE: TinyIntCoder(),
+ type_info_name.BOOLEAN: BooleanCoder(),
+ type_info_name.SHORT: SmallIntCoder(),
+ type_info_name.INT: IntCoder(),
+ type_info_name.LONG: BigIntCoder(),
+ type_info_name.FLOAT: FloatCoder(),
+ type_info_name.DOUBLE: DoubleCoder(),
+ type_info_name.CHAR: CharCoder(),
+ type_info_name.BIG_INT: BigIntCoder(),
+ type_info_name.BIG_DEC: BigDecimalCoder(),
+ type_info_name.SQL_DATE: DateCoder(),
+ type_info_name.SQL_TIME: TimeCoder(),
+ type_info_name.SQL_TIMESTAMP: TimestampCoder(3),
+ type_info_name.PICKLED_BYTES: CloudPickleCoder(),
+ type_info_name.INSTANT: InstantCoder()
+ }
+
field_type_name = type_info.type_name
try:
return _type_info_name_mappings[field_type_name]
diff --git a/flink-python/pyflink/fn_execution/datastream/operations.py b/flink-python/pyflink/fn_execution/datastream/operations.py
index 3d36b30..afaed5a 100644
--- a/flink-python/pyflink/fn_execution/datastream/operations.py
+++ b/flink-python/pyflink/fn_execution/datastream/operations.py
@@ -17,19 +17,15 @@
################################################################################
import abc
-from apache_beam.runners.worker.bundle_processor import TimerInfo
-
from pyflink.common import Row
from pyflink.common.serializer import VoidNamespaceSerializer
from pyflink.datastream import TimeDomain, RuntimeContext
-from pyflink.fn_execution import flink_fn_execution_pb2
from pyflink.fn_execution import pickle
from pyflink.fn_execution.datastream.process_function import \
InternalKeyedProcessFunctionOnTimerContext, InternalKeyedProcessFunctionContext, \
InternalProcessFunctionContext
from pyflink.fn_execution.datastream.runtime_context import StreamingRuntimeContext
from pyflink.fn_execution.datastream.window.window_operator import WindowOperator
-from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend
from pyflink.fn_execution.datastream.timerservice_impl import (
TimerServiceImpl, InternalTimerServiceImpl, NonKeyedTimerServiceImpl)
from pyflink.fn_execution.datastream.input_handler import (RunnerInputHandler, TimerHandler,
@@ -43,9 +39,8 @@ DATA_STREAM_STATEFUL_FUNCTION_URN = "flink:transform:ds:stateful_function:v1"
class Operation(abc.ABC):
- def __init__(self, spec):
- self.spec = spec
- if self.spec.serialized_fn.metric_enabled:
+ def __init__(self, serialized_fn):
+ if serialized_fn.metric_enabled:
self.base_metric_group = GenericMetricGroup(None, None)
else:
self.base_metric_group = None
@@ -74,13 +69,13 @@ class Operation(abc.ABC):
class StatelessOperation(Operation):
- def __init__(self, spec):
- super(StatelessOperation, self).__init__(spec)
+ def __init__(self, serialized_fn):
+ super(StatelessOperation, self).__init__(serialized_fn)
self.open_func, self.close_func, self.process_element_func = \
extract_stateless_function(
- user_defined_function_proto=self.spec.serialized_fn,
+ user_defined_function_proto=serialized_fn,
runtime_context=StreamingRuntimeContext.of(
- self.spec.serialized_fn.runtime_context,
+ serialized_fn.runtime_context,
self.base_metric_group))
def open(self):
@@ -95,15 +90,15 @@ class StatelessOperation(Operation):
class StatefulOperation(Operation):
- def __init__(self, spec, keyed_state_backend):
- super(StatefulOperation, self).__init__(spec)
+ def __init__(self, serialized_fn, keyed_state_backend):
+ super(StatefulOperation, self).__init__(serialized_fn)
self.keyed_state_backend = keyed_state_backend
self.open_func, self.close_func, self.process_element_func, self.process_timer_func, \
self.internal_timer_service = \
extract_stateful_function(
- user_defined_function_proto=self.spec.serialized_fn,
+ user_defined_function_proto=serialized_fn,
runtime_context=StreamingRuntimeContext.of(
- self.spec.serialized_fn.runtime_context,
+ serialized_fn.runtime_context,
self.base_metric_group,
self.keyed_state_backend),
keyed_state_backend=self.keyed_state_backend)
@@ -124,7 +119,7 @@ class StatefulOperation(Operation):
def process_timer(self, timer_data):
return self.process_timer_func(timer_data)
- def add_timer_info(self, timer_info: TimerInfo):
+ def add_timer_info(self, timer_info):
self.internal_timer_service.add_timer_info(timer_info)
@@ -136,6 +131,8 @@ def extract_stateless_function(user_defined_function_proto, runtime_context: Run
:param user_defined_function_proto: the proto representation of the Python :class:`Function`
:param runtime_context: the streaming runtime context
"""
+ from pyflink.fn_execution import flink_fn_execution_pb2
+
func_type = user_defined_function_proto.function_type
UserDefinedDataStreamFunction = flink_fn_execution_pb2.UserDefinedDataStreamFunction
@@ -210,7 +207,9 @@ def extract_stateless_function(user_defined_function_proto, runtime_context: Run
def extract_stateful_function(user_defined_function_proto,
runtime_context: RuntimeContext,
- keyed_state_backend: RemoteKeyedStateBackend):
+ keyed_state_backend):
+ from pyflink.fn_execution import flink_fn_execution_pb2
+
func_type = user_defined_function_proto.function_type
user_defined_func = pickle.loads(user_defined_function_proto.payload)
internal_timer_service = InternalTimerServiceImpl(keyed_state_backend)
diff --git a/flink-python/pyflink/fn_execution/datastream/runtime_context.py b/flink-python/pyflink/fn_execution/datastream/runtime_context.py
index 86969f2..07b4acb 100644
--- a/flink-python/pyflink/fn_execution/datastream/runtime_context.py
+++ b/flink-python/pyflink/fn_execution/datastream/runtime_context.py
@@ -15,14 +15,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from typing import Dict, Union
+from typing import Dict
from pyflink.datastream import RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor, ValueState, ListStateDescriptor, \
ListState, MapStateDescriptor, MapState, ReducingStateDescriptor, ReducingState, \
AggregatingStateDescriptor, AggregatingState
from pyflink.fn_execution.coders import from_type_info, MapCoder, GenericArrayCoder
-from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend
from pyflink.metrics import MetricGroup
@@ -37,7 +36,7 @@ class StreamingRuntimeContext(RuntimeContext):
attempt_number: int,
job_parameters: Dict[str, str],
metric_group: MetricGroup,
- keyed_state_backend: Union[RemoteKeyedStateBackend, None],
+ keyed_state_backend,
in_batch_execution_mode: bool):
self._task_name = task_name
self._task_name_with_subtasks = task_name_with_subtasks
diff --git a/flink-python/pyflink/fn_execution/datastream/timerservice_impl.py b/flink-python/pyflink/fn_execution/datastream/timerservice_impl.py
index 46397af..cff83a4 100644
--- a/flink-python/pyflink/fn_execution/datastream/timerservice_impl.py
+++ b/flink-python/pyflink/fn_execution/datastream/timerservice_impl.py
@@ -20,14 +20,9 @@ import time
from enum import Enum
from io import BytesIO
-from apache_beam.runners.worker.bundle_processor import TimerInfo
-from apache_beam.transforms import userstate
-from apache_beam.transforms.window import GlobalWindow
-
from pyflink.common import Row
from pyflink.datastream import TimerService
from pyflink.fn_execution.datastream.timerservice import InternalTimer, K, N, InternalTimerService
-from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend
class TimerOperandType(Enum):
@@ -44,7 +39,7 @@ class LegacyInternalTimerServiceImpl(InternalTimerService[N]):
TODO: Use InternalTimerServiceImpl instead.
"""
- def __init__(self, keyed_state_backend: RemoteKeyedStateBackend):
+ def __init__(self, keyed_state_backend):
self._keyed_state_backend = keyed_state_backend
self._current_watermark = None
self.timers = collections.OrderedDict()
@@ -85,14 +80,17 @@ class InternalTimerServiceImpl(InternalTimerService[N]):
Internal implementation of InternalTimerService.
"""
- def __init__(self, keyed_state_backend: RemoteKeyedStateBackend):
+ def __init__(self, keyed_state_backend):
self._keyed_state_backend = keyed_state_backend
self._current_watermark = None
self._timer_coder_impl = None
self._output_stream = None
+
+ from apache_beam.transforms.window import GlobalWindow
+
self._global_window = GlobalWindow()
- def add_timer_info(self, timer_info: TimerInfo):
+ def add_timer_info(self, timer_info):
self._timer_coder_impl = timer_info.timer_coder_impl
self._output_stream = timer_info.output_stream
@@ -125,6 +123,8 @@ class InternalTimerServiceImpl(InternalTimerService[N]):
self._set_timer(TimerOperandType.DELETE_EVENT_TIMER, ts, current_key, namespace)
def _set_timer(self, timer_operation_type, ts, key, namespace):
+ from apache_beam.transforms import userstate
+
bytes_io = BytesIO()
self._namespace_serializer.serialize(namespace, bytes_io)
encoded_namespace = bytes_io.getvalue()
diff --git a/flink-python/pyflink/fn_execution/datastream/window/window_operator.py b/flink-python/pyflink/fn_execution/datastream/window/window_operator.py
index 0ec8873..be172f2 100644
--- a/flink-python/pyflink/fn_execution/datastream/window/window_operator.py
+++ b/flink-python/pyflink/fn_execution/datastream/window/window_operator.py
@@ -28,7 +28,6 @@ from pyflink.datastream.window import MAX_LONG_VALUE
from pyflink.fn_execution.datastream.window.merging_window_set import MergingWindowSet
from pyflink.fn_execution.internal_state import InternalMergingState, InternalKvState, \
InternalAppendingState
-from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend
from pyflink.metrics import MetricGroup
T = TypeVar("T")
@@ -269,7 +268,7 @@ class WindowOperator(object):
def __init__(self,
window_assigner: WindowAssigner,
- keyed_state_backend: RemoteKeyedStateBackend,
+ keyed_state_backend,
user_key_selector,
window_state_descriptor: StateDescriptor,
window_function: InternalWindowFunction,
diff --git a/flink-python/pyflink/fn_execution/table/aggregate_fast.pyx b/flink-python/pyflink/fn_execution/table/aggregate_fast.pyx
index 265552b..2db06c7 100644
--- a/flink-python/pyflink/fn_execution/table/aggregate_fast.pyx
+++ b/flink-python/pyflink/fn_execution/table/aggregate_fast.pyx
@@ -26,7 +26,6 @@ from pyflink.common import Row
from pyflink.fn_execution.coders import PickleCoder
from pyflink.fn_execution.table.state_data_view import DataViewSpec, ListViewSpec, MapViewSpec, \
PerKeyStateDataViewStore
-from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend
from pyflink.table import AggregateFunction, TableAggregateFunction
cdef InternalRow join_row(list left, list right, InternalRowKind row_kind):
@@ -435,7 +434,7 @@ cdef class GroupAggFunctionBase:
def __init__(self,
aggs_handle: AggsHandleFunctionBase,
key_selector: RowKeySelector,
- state_backend: RemoteKeyedStateBackend,
+ state_backend,
state_value_coder,
generate_update_before: bool,
state_cleaning_enabled: bool,
@@ -479,7 +478,7 @@ cdef class GroupAggFunction(GroupAggFunctionBase):
def __init__(self,
aggs_handle,
key_selector: RowKeySelector,
- state_backend: RemoteKeyedStateBackend,
+ state_backend,
state_value_coder,
generate_update_before: bool,
state_cleaning_enabled: bool,
@@ -586,7 +585,7 @@ cdef class GroupTableAggFunction(GroupAggFunctionBase):
def __init__(self,
aggs_handle,
key_selector: RowKeySelector,
- state_backend: RemoteKeyedStateBackend,
+ state_backend,
state_value_coder,
generate_update_before: bool,
state_cleaning_enabled: bool,
diff --git a/flink-python/pyflink/fn_execution/table/aggregate_slow.py b/flink-python/pyflink/fn_execution/table/aggregate_slow.py
index d847d53..e10f6b0 100644
--- a/flink-python/pyflink/fn_execution/table/aggregate_slow.py
+++ b/flink-python/pyflink/fn_execution/table/aggregate_slow.py
@@ -22,7 +22,6 @@ from pyflink.common import Row, RowKind
from pyflink.fn_execution.coders import PickleCoder
from pyflink.fn_execution.table.state_data_view import DataViewSpec, ListViewSpec, MapViewSpec, \
PerKeyStateDataViewStore
-from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend
from pyflink.table import AggregateFunction, FunctionContext, TableAggregateFunction
from pyflink.table.udf import ImperativeAggregateFunction
@@ -409,7 +408,7 @@ class GroupAggFunctionBase(object):
def __init__(self,
aggs_handle: AggsHandleFunctionBase,
key_selector: RowKeySelector,
- state_backend: RemoteKeyedStateBackend,
+ state_backend,
state_value_coder,
generate_update_before: bool,
state_cleaning_enabled: bool,
@@ -455,7 +454,7 @@ class GroupAggFunction(GroupAggFunctionBase):
def __init__(self,
aggs_handle: AggsHandleFunction,
key_selector: RowKeySelector,
- state_backend: RemoteKeyedStateBackend,
+ state_backend,
state_value_coder,
generate_update_before: bool,
state_cleaning_enabled: bool,
@@ -550,7 +549,7 @@ class GroupTableAggFunction(GroupAggFunctionBase):
def __init__(self,
aggs_handle: TableAggsHandleFunction,
key_selector: RowKeySelector,
- state_backend: RemoteKeyedStateBackend,
+ state_backend,
state_value_coder,
generate_update_before: bool,
state_cleaning_enabled: bool,
diff --git a/flink-python/pyflink/fn_execution/table/operations.py b/flink-python/pyflink/fn_execution/table/operations.py
index ab0f72a..3f4069f 100644
--- a/flink-python/pyflink/fn_execution/table/operations.py
+++ b/flink-python/pyflink/fn_execution/table/operations.py
@@ -24,7 +24,6 @@ from pyflink.fn_execution.coders import DataViewFilterCoder, PickleCoder
from pyflink.fn_execution.datastream.timerservice import InternalTimer
from pyflink.fn_execution.datastream.operations import Operation
from pyflink.fn_execution.datastream.timerservice_impl import TimerOperandType, InternalTimerImpl
-from pyflink.fn_execution import flink_fn_execution_pb2
from pyflink.fn_execution.table.state_data_view import extract_data_view_specs
from pyflink.fn_execution.table.window_assigner import TumblingWindowAssigner, \
@@ -76,9 +75,9 @@ class BundleOperation(object):
class BaseOperation(Operation):
- def __init__(self, spec):
- super(BaseOperation, self).__init__(spec)
- self.func, self.user_defined_funcs = self.generate_func(self.spec.serialized_fn)
+ def __init__(self, serialized_fn):
+ super(BaseOperation, self).__init__(serialized_fn)
+ self.func, self.user_defined_funcs = self.generate_func(serialized_fn)
def process_element(self, value):
return self.func(value)
@@ -99,8 +98,10 @@ class BaseOperation(Operation):
class ScalarFunctionOperation(BaseOperation):
- def __init__(self, spec):
- super(ScalarFunctionOperation, self).__init__(spec)
+ def __init__(self, serialized_fn, one_arg_optimization=False, one_result_optimization=False):
+ self._one_arg_optimization = one_arg_optimization
+ self._one_result_optimization = one_result_optimization
+ super(ScalarFunctionOperation, self).__init__(serialized_fn)
def generate_func(self, serialized_fn):
"""
@@ -114,14 +115,20 @@ class ScalarFunctionOperation(BaseOperation):
','.join([x[0], y[0]]),
dict(chain(x[1].items(), y[1].items())),
x[2] + y[2]),
- [operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs])
- generate_func = eval('lambda value: [%s]' % scalar_functions, variable_dict)
+ [operation_utils.extract_user_defined_function(
+ udf, one_arg_optimization=self._one_arg_optimization)
+ for udf in serialized_fn.udfs])
+ if self._one_result_optimization:
+ func_str = 'lambda value: %s' % scalar_functions
+ else:
+ func_str = 'lambda value: [%s]' % scalar_functions
+ generate_func = eval(func_str, variable_dict)
return generate_func, user_defined_funcs
class TableFunctionOperation(BaseOperation):
- def __init__(self, spec):
- super(TableFunctionOperation, self).__init__(spec)
+ def __init__(self, serialized_fn):
+ super(TableFunctionOperation, self).__init__(serialized_fn)
def generate_func(self, serialized_fn):
"""
@@ -140,8 +147,8 @@ class TableFunctionOperation(BaseOperation):
class PandasAggregateFunctionOperation(BaseOperation):
- def __init__(self, spec):
- super(PandasAggregateFunctionOperation, self).__init__(spec)
+ def __init__(self, serialized_fn):
+ super(PandasAggregateFunctionOperation, self).__init__(serialized_fn)
def generate_func(self, serialized_fn):
pandas_functions, variable_dict, user_defined_funcs = reduce(
@@ -158,13 +165,16 @@ class PandasAggregateFunctionOperation(BaseOperation):
class PandasBatchOverWindowAggregateFunctionOperation(BaseOperation):
- def __init__(self, spec):
- super(PandasBatchOverWindowAggregateFunctionOperation, self).__init__(spec)
- self.windows = [window for window in self.spec.serialized_fn.windows]
+ def __init__(self, serialized_fn):
+ super(PandasBatchOverWindowAggregateFunctionOperation, self).__init__(serialized_fn)
+ self.windows = [window for window in serialized_fn.windows]
# the index among all the bounded range over window
self.bounded_range_window_index = [-1 for _ in range(len(self.windows))]
# Whether the specified position window is a bounded range window.
self.is_bounded_range_window = []
+
+ from pyflink.fn_execution import flink_fn_execution_pb2
+
window_types = flink_fn_execution_pb2.OverWindow
bounded_range_window_nums = 0
@@ -193,6 +203,8 @@ class PandasBatchOverWindowAggregateFunctionOperation(BaseOperation):
def wrapped_over_window_function(self, boundaries_series):
import pandas as pd
+ from pyflink.fn_execution import flink_fn_execution_pb2
+
OverWindow = flink_fn_execution_pb2.OverWindow
input_series = boundaries_series[-1]
# the row number of the arrow format data
@@ -264,9 +276,9 @@ class PandasBatchOverWindowAggregateFunctionOperation(BaseOperation):
class BaseStatefulOperation(BaseOperation, abc.ABC):
- def __init__(self, spec, keyed_state_backend):
+ def __init__(self, serialized_fn, keyed_state_backend):
self.keyed_state_backend = keyed_state_backend
- super(BaseStatefulOperation, self).__init__(spec)
+ super(BaseStatefulOperation, self).__init__(serialized_fn)
def finish(self):
super().finish()
@@ -282,19 +294,20 @@ REGISTER_PROCESSING_TIMER = 1
class AbstractStreamGroupAggregateOperation(BaseStatefulOperation):
- def __init__(self, spec, keyed_state_backend):
- self.generate_update_before = spec.serialized_fn.generate_update_before
- self.grouping = [i for i in spec.serialized_fn.grouping]
+ def __init__(self, serialized_fn, keyed_state_backend):
+ self.generate_update_before = serialized_fn.generate_update_before
+ self.grouping = [i for i in serialized_fn.grouping]
self.group_agg_function = None
# If the upstream generates retract message, we need to add an additional count1() agg
# to track current accumulated messages count. If all the messages are retracted, we need
# to send a DELETE message to downstream.
- self.index_of_count_star = spec.serialized_fn.index_of_count_star
- self.count_star_inserted = spec.serialized_fn.count_star_inserted
- self.state_cache_size = spec.serialized_fn.state_cache_size
- self.state_cleaning_enabled = spec.serialized_fn.state_cleaning_enabled
- self.data_view_specs = extract_data_view_specs(spec.serialized_fn.udfs)
- super(AbstractStreamGroupAggregateOperation, self).__init__(spec, keyed_state_backend)
+ self.index_of_count_star = serialized_fn.index_of_count_star
+ self.count_star_inserted = serialized_fn.count_star_inserted
+ self.state_cache_size = serialized_fn.state_cache_size
+ self.state_cleaning_enabled = serialized_fn.state_cleaning_enabled
+ self.data_view_specs = extract_data_view_specs(serialized_fn.udfs)
+ super(AbstractStreamGroupAggregateOperation, self).__init__(
+ serialized_fn, keyed_state_backend)
def open(self):
self.group_agg_function.open(FunctionContext(self.base_metric_group))
@@ -363,8 +376,8 @@ class AbstractStreamGroupAggregateOperation(BaseStatefulOperation):
class StreamGroupAggregateOperation(AbstractStreamGroupAggregateOperation, BundleOperation):
- def __init__(self, spec, keyed_state_backend):
- super(StreamGroupAggregateOperation, self).__init__(spec, keyed_state_backend)
+ def __init__(self, serialized_fn, keyed_state_backend):
+ super(StreamGroupAggregateOperation, self).__init__(serialized_fn, keyed_state_backend)
def finish_bundle(self):
return self.group_agg_function.finish_bundle()
@@ -393,8 +406,8 @@ class StreamGroupAggregateOperation(AbstractStreamGroupAggregateOperation, Bundl
class StreamGroupTableAggregateOperation(AbstractStreamGroupAggregateOperation, BundleOperation):
- def __init__(self, spec, keyed_state_backend):
- super(StreamGroupTableAggregateOperation, self).__init__(spec, keyed_state_backend)
+ def __init__(self, serialized_fn, keyed_state_backend):
+ super(StreamGroupTableAggregateOperation, self).__init__(serialized_fn, keyed_state_backend)
def finish_bundle(self):
return self.group_agg_function.finish_bundle()
@@ -420,17 +433,20 @@ class StreamGroupTableAggregateOperation(AbstractStreamGroupAggregateOperation,
class StreamGroupWindowAggregateOperation(AbstractStreamGroupAggregateOperation):
- def __init__(self, spec, keyed_state_backend):
- self._window = spec.serialized_fn.group_window
+ def __init__(self, serialized_fn, keyed_state_backend):
+ self._window = serialized_fn.group_window
self._named_property_extractor = self._create_named_property_function()
self._is_time_window = None
self._reuse_timer_data = Row()
self._reuse_key_data = Row()
- super(StreamGroupWindowAggregateOperation, self).__init__(spec, keyed_state_backend)
+ super(StreamGroupWindowAggregateOperation, self).__init__(
+ serialized_fn, keyed_state_backend)
def create_process_function(self, user_defined_aggs, input_extractors, filter_args,
distinct_indexes, distinct_view_descriptors, key_selector,
state_value_coder):
+ from pyflink.fn_execution import flink_fn_execution_pb2
+
self._is_time_window = self._window.is_time_window
self._namespace_coder = self.keyed_state_backend._namespace_coder_impl
if self._window.window_type == flink_fn_execution_pb2.GroupWindow.TUMBLING_GROUP_WINDOW:
@@ -515,6 +531,8 @@ class StreamGroupWindowAggregateOperation(AbstractStreamGroupAggregateOperation)
yield [NORMAL_RECORD, result_data, None]
def _create_named_property_function(self):
+ from pyflink.fn_execution import flink_fn_execution_pb2
+
named_property_extractor_array = []
for named_property in self._window.namedProperties:
if named_property == flink_fn_execution_pb2.GroupWindow.WINDOW_START:
diff --git a/flink-python/pyflink/fn_execution/table/state_data_view.py b/flink-python/pyflink/fn_execution/table/state_data_view.py
index 0388e63..bc6c9dd 100644
--- a/flink-python/pyflink/fn_execution/table/state_data_view.py
+++ b/flink-python/pyflink/fn_execution/table/state_data_view.py
@@ -22,7 +22,6 @@ from pyflink.datastream.state import ListState, MapState
from pyflink.fn_execution.coders import from_proto, PickleCoder
from pyflink.fn_execution.internal_state import InternalListState, InternalMapState
from pyflink.fn_execution.utils.operation_utils import is_built_in_function, load_aggregate_function
-from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend
from pyflink.table import FunctionContext
from pyflink.table.data_view import ListView, MapView, DataView
@@ -230,7 +229,7 @@ class StateDataViewStore(ABC):
def __init__(self,
function_context: FunctionContext,
- keyed_state_backend: RemoteKeyedStateBackend):
+ keyed_state_backend):
self._function_context = function_context
self._keyed_state_backend = keyed_state_backend
@@ -268,7 +267,7 @@ class PerKeyStateDataViewStore(StateDataViewStore):
def __init__(self,
function_context: FunctionContext,
- keyed_state_backend: RemoteKeyedStateBackend):
+ keyed_state_backend):
super(PerKeyStateDataViewStore, self).__init__(function_context, keyed_state_backend)
def get_state_list_view(self, state_name, element_coder):
@@ -289,7 +288,7 @@ class PerWindowStateDataViewStore(StateDataViewStore):
def __init__(self,
function_context: FunctionContext,
- keyed_state_backend: RemoteKeyedStateBackend):
+ keyed_state_backend):
super(PerWindowStateDataViewStore, self).__init__(function_context, keyed_state_backend)
def get_state_list_view(self, state_name, element_coder):
diff --git a/flink-python/pyflink/fn_execution/table/window_aggregate_fast.pyx b/flink-python/pyflink/fn_execution/table/window_aggregate_fast.pyx
index 2704a4a..7a21249 100644
--- a/flink-python/pyflink/fn_execution/table/window_aggregate_fast.pyx
+++ b/flink-python/pyflink/fn_execution/table/window_aggregate_fast.pyx
@@ -34,7 +34,6 @@ from pyflink.fn_execution.datastream.timerservice_impl import LegacyInternalTime
from pyflink.fn_execution.coders import PickleCoder
from pyflink.fn_execution.table.state_data_view import DataViewSpec, ListViewSpec, MapViewSpec, \
PerWindowStateDataViewStore
-from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend
from pyflink.fn_execution.table.window_assigner import WindowAssigner, PanedWindowAssigner, \
MergingWindowAssigner
from pyflink.fn_execution.table.window_context import WindowContext, TriggerContext, K, W
@@ -329,7 +328,7 @@ cdef class GroupWindowAggFunctionBase:
def __init__(self,
allowed_lateness: int,
key_selector: RowKeySelector,
- state_backend: RemoteKeyedStateBackend,
+ state_backend,
state_value_coder,
window_assigner: WindowAssigner[W],
window_aggregator: NamespaceAggsHandleFunctionBase,
@@ -510,7 +509,7 @@ cdef class GroupWindowAggFunction(GroupWindowAggFunctionBase):
def __init__(self,
allowed_lateness: int,
key_selector: RowKeySelector,
- state_backend: RemoteKeyedStateBackend,
+ state_backend,
state_value_coder,
window_assigner: WindowAssigner[W],
window_aggregator: NamespaceAggsHandleFunction[W],
diff --git a/flink-python/pyflink/fn_execution/table/window_aggregate_slow.py b/flink-python/pyflink/fn_execution/table/window_aggregate_slow.py
index 5ed146d..a7e557b 100644
--- a/flink-python/pyflink/fn_execution/table/window_aggregate_slow.py
+++ b/flink-python/pyflink/fn_execution/table/window_aggregate_slow.py
@@ -29,7 +29,6 @@ from pyflink.fn_execution.coders import PickleCoder
from pyflink.fn_execution.table.aggregate_slow import DistinctViewDescriptor, RowKeySelector
from pyflink.fn_execution.table.state_data_view import DataViewSpec, ListViewSpec, MapViewSpec, \
PerWindowStateDataViewStore
-from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend
from pyflink.fn_execution.table.window_assigner import WindowAssigner, PanedWindowAssigner, \
MergingWindowAssigner
from pyflink.fn_execution.table.window_context import WindowContext, TriggerContext, K, W
@@ -289,7 +288,7 @@ class GroupWindowAggFunctionBase(Generic[K, W]):
def __init__(self,
allowed_lateness: int,
key_selector: RowKeySelector,
- state_backend: RemoteKeyedStateBackend,
+ state_backend,
state_value_coder,
window_assigner: WindowAssigner[W],
window_aggregator: NamespaceAggsHandleFunctionBase[W],
@@ -456,7 +455,7 @@ class GroupWindowAggFunction(GroupWindowAggFunctionBase[K, W]):
def __init__(self,
allowed_lateness: int,
key_selector: RowKeySelector,
- state_backend: RemoteKeyedStateBackend,
+ state_backend,
state_value_coder,
window_assigner: WindowAssigner[W],
window_aggregator: NamespaceAggsHandleFunction[W],
diff --git a/flink-python/pyflink/fn_execution/table/window_context.py b/flink-python/pyflink/fn_execution/table/window_context.py
index fe64f7f..2e1946e 100644
--- a/flink-python/pyflink/fn_execution/table/window_context.py
+++ b/flink-python/pyflink/fn_execution/table/window_context.py
@@ -19,15 +19,12 @@ import sys
from abc import ABC, abstractmethod
from typing import Generic, TypeVar, List, Iterable
-from apache_beam.coders import Coder
-
from pyflink.datastream.state import StateDescriptor, State, ValueStateDescriptor, \
ListStateDescriptor, MapStateDescriptor
from pyflink.datastream.window import TimeWindow, CountWindow
from pyflink.fn_execution.datastream.timerservice_impl import LegacyInternalTimerServiceImpl
from pyflink.fn_execution.coders import from_type_info, MapCoder, GenericArrayCoder
from pyflink.fn_execution.internal_state import InternalMergingState
-from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend
MAX_LONG_VALUE = sys.maxsize
@@ -119,8 +116,8 @@ class WindowContext(Context[K, W]):
def __init__(self,
window_operator,
trigger_context: 'TriggerContext',
- state_backend: RemoteKeyedStateBackend,
- state_value_coder: Coder,
+ state_backend,
+ state_value_coder,
timer_service: LegacyInternalTimerServiceImpl,
is_event_time: bool):
self._window_operator = window_operator
@@ -183,7 +180,7 @@ class TriggerContext(object):
def __init__(self,
trigger,
timer_service: LegacyInternalTimerServiceImpl[W],
- state_backend: RemoteKeyedStateBackend):
+ state_backend):
self._trigger = trigger
self._timer_service = timer_service
self._state_backend = state_backend
diff --git a/flink-python/pyflink/fn_execution/table/window_process_function.py b/flink-python/pyflink/fn_execution/table/window_process_function.py
index cfe0d2f..733d4c1 100644
--- a/flink-python/pyflink/fn_execution/table/window_process_function.py
+++ b/flink-python/pyflink/fn_execution/table/window_process_function.py
@@ -21,7 +21,6 @@ from typing import Generic, List, Iterable, Dict, Set
from pyflink.common import Row
from pyflink.datastream.state import MapState
-from pyflink.fn_execution.state_impl import LRUCache
from pyflink.fn_execution.table.window_assigner import WindowAssigner, PanedWindowAssigner, \
MergingWindowAssigner
from pyflink.fn_execution.table.window_context import Context, K, W
@@ -236,6 +235,9 @@ class MergingWindowProcessFunction(InternalWindowProcessFunction[K, W]):
self._window_mapping = None # type: MapState
self._state_backend = state_backend
self._sorted_windows = None # type: List
+
+ from pyflink.fn_execution.state_impl import LRUCache
+
self._cached_sorted_windows = LRUCache(10000, None)
def open(self, ctx: Context[K, W]):
diff --git a/flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py b/flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py
index 119c887..c98a0b8 100644
--- a/flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py
+++ b/flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py
@@ -105,7 +105,7 @@ class PythonBootTests(PyFlinkTestCase):
def test_set_working_directory(self):
JProcessPythonEnvironmentManager = \
- get_gateway().jvm.org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager
+ get_gateway().jvm.org.apache.flink.python.env.process.ProcessPythonEnvironmentManager
output_file = os.path.join(self.tmp_dir, "output.txt")
pyflink_dir = os.path.join(self.tmp_dir, "pyflink")
diff --git a/flink-python/pyflink/fn_execution/utils/operation_utils.py b/flink-python/pyflink/fn_execution/utils/operation_utils.py
index 9ed177d..3f47ca0 100644
--- a/flink-python/pyflink/fn_execution/utils/operation_utils.py
+++ b/flink-python/pyflink/fn_execution/utils/operation_utils.py
@@ -93,7 +93,8 @@ def extract_over_window_user_defined_function(user_defined_function_proto):
return (*extract_user_defined_function(user_defined_function_proto, True), window_index)
-def extract_user_defined_function(user_defined_function_proto, pandas_udaf=False)\
+def extract_user_defined_function(user_defined_function_proto, pandas_udaf=False,
+ one_arg_optimization=False)\
-> Tuple[str, Dict, List]:
"""
Extracts user-defined-function from the proto representation of a
@@ -101,6 +102,7 @@ def extract_user_defined_function(user_defined_function_proto, pandas_udaf=False
:param user_defined_function_proto: the proto representation of the Python
:param pandas_udaf: whether the user_defined_function_proto is pandas udaf
+ :param one_arg_optimization: whether the optimization enabled
:class:`UserDefinedFunction`
"""
@@ -116,13 +118,17 @@ def extract_user_defined_function(user_defined_function_proto, pandas_udaf=False
for arg in args:
if arg.HasField("udf"):
# for chaining Python UDF input: the input argument is a Python ScalarFunction
- udf_arg, udf_variable_dict, udf_funcs = extract_user_defined_function(arg.udf)
+ udf_arg, udf_variable_dict, udf_funcs = extract_user_defined_function(
+ arg.udf, one_arg_optimization=one_arg_optimization)
args_str.append(udf_arg)
local_variable_dict.update(udf_variable_dict)
local_funcs.extend(udf_funcs)
elif arg.HasField("inputOffset"):
- # the input argument is a column of the input row
- args_str.append("value[%s]" % arg.inputOffset)
+ if one_arg_optimization:
+ args_str.append("value")
+ else:
+ # the input argument is a column of the input row
+ args_str.append("value[%s]" % arg.inputOffset)
else:
# the input argument is a constant value
constant_value_name, parsed_constant_value = \
@@ -275,6 +281,43 @@ def load_aggregate_function(payload):
return pickle.loads(payload)
+def parse_function_proto(proto):
+ from pyflink.fn_execution import flink_fn_execution_pb2
+ serialized_fn = flink_fn_execution_pb2.UserDefinedFunctions()
+ serialized_fn.ParseFromString(proto)
+ return serialized_fn
+
+
+def deserialized_operation_from_serialized_bytes(b):
+ import cloudpickle
+ return cloudpickle.loads(b)
+
+
+def create_scalar_operation_from_proto(proto, one_arg_optimization=False,
+ one_result_optimization=False):
+ from pyflink.fn_execution.table.operations import ScalarFunctionOperation
+
+ serialized_fn = parse_function_proto(proto)
+ scalar_operation = ScalarFunctionOperation(
+ serialized_fn, one_arg_optimization, one_result_optimization)
+ return scalar_operation
+
+
+def create_serialized_scalar_operation_from_proto(proto, one_arg_optimization=False,
+ one_result_optimization=False):
+ """
+ The CPython extension included in proto does not support initialization multiple times, so we
+ choose the only interpreter process to be responsible for initialization and proto parsing. The
+ only interpreter parses the proto and serializes function operations with cloudpickle.
+ """
+
+ import cloudpickle
+
+ scalar_operation = create_scalar_operation_from_proto(
+ bytes(b % 256 for b in proto), one_arg_optimization, one_result_optimization)
+ return cloudpickle.dumps(scalar_operation)
+
+
class PeriodicThread(threading.Thread):
"""Call a function periodically with the specified number of seconds"""
diff --git a/flink-python/pyflink/table/tests/test_dependency.py b/flink-python/pyflink/table/tests/test_dependency.py
index 22c3059..928842a 100644
--- a/flink-python/pyflink/table/tests/test_dependency.py
+++ b/flink-python/pyflink/table/tests/test_dependency.py
@@ -21,6 +21,8 @@ import sys
import unittest
import uuid
+import pytest
+
from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
from pyflink.table import expressions as expr
from pyflink.table.udf import udf
@@ -63,6 +65,48 @@ class DependencyTests(object):
actual = source_sink_utils.results()
self.assert_equals(actual, ["+I[3, 1]", "+I[4, 2]", "+I[5, 3]"])
+ def test_add_python_archive(self):
+ tmp_dir = self.tempdir
+ archive_dir_path = os.path.join(tmp_dir, "archive_" + str(uuid.uuid4()))
+ os.mkdir(archive_dir_path)
+ with open(os.path.join(archive_dir_path, "data.txt"), 'w') as f:
+ f.write("2")
+ archive_file_path = \
+ shutil.make_archive(os.path.dirname(archive_dir_path), 'zip', archive_dir_path)
+ self.t_env.add_python_archive(archive_file_path, "data")
+
+ def add_from_file(i):
+ with open("data/data.txt", 'r') as f:
+ return i + int(f.read())
+
+ self.t_env.create_temporary_system_function("add_from_file",
+ udf(add_from_file, DataTypes.BIGINT(),
+ DataTypes.BIGINT()))
+ table_sink = source_sink_utils.TestAppendSink(
+ ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
+ self.t_env.register_table_sink("Results", table_sink)
+ t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
+ t.select(expr.call('add_from_file', t.a), t.a).execute_insert("Results").wait()
+
+ actual = source_sink_utils.results()
+ self.assert_equals(actual, ["+I[3, 1]", "+I[4, 2]", "+I[5, 3]"])
+
+
+@pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7")
+class EmbeddedMultiThreadDependencyTests(DependencyTests, PyFlinkStreamTableTestCase):
+ def setUp(self):
+ super(EmbeddedMultiThreadDependencyTests, self).setUp()
+ self.t_env.get_config().get_configuration().set_string("python.execution-mode",
+ "multi-thread")
+
+
+@pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7")
+class EmbeddedSubInterpreterDependencyTests(DependencyTests, PyFlinkStreamTableTestCase):
+ def setUp(self):
+ super(EmbeddedSubInterpreterDependencyTests, self).setUp()
+ self.t_env.get_config().get_configuration().set_string("python.execution-mode",
+ "sub-interpreter")
+
class BatchDependencyTests(DependencyTests, PyFlinkBatchTableTestCase):
@@ -150,32 +194,6 @@ class StreamDependencyTests(DependencyTests, PyFlinkStreamTableTestCase):
actual = source_sink_utils.results()
self.assert_equals(actual, ["+I[2, 1]", "+I[3, 2]", "+I[4, 3]"])
- def test_add_python_archive(self):
- tmp_dir = self.tempdir
- archive_dir_path = os.path.join(tmp_dir, "archive_" + str(uuid.uuid4()))
- os.mkdir(archive_dir_path)
- with open(os.path.join(archive_dir_path, "data.txt"), 'w') as f:
- f.write("2")
- archive_file_path = \
- shutil.make_archive(os.path.dirname(archive_dir_path), 'zip', archive_dir_path)
- self.t_env.add_python_archive(archive_file_path, "data")
-
- def add_from_file(i):
- with open("data/data.txt", 'r') as f:
- return i + int(f.read())
-
- self.t_env.create_temporary_system_function("add_from_file",
- udf(add_from_file, DataTypes.BIGINT(),
- DataTypes.BIGINT()))
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
- self.t_env.register_table_sink("Results", table_sink)
- t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
- t.select(expr.call('add_from_file', t.a), t.a).execute_insert("Results").wait()
-
- actual = source_sink_utils.results()
- self.assert_equals(actual, ["+I[3, 1]", "+I[4, 2]", "+I[5, 3]"])
-
def test_set_environment(self):
python_exec_link_path = sys.executable
self.st_env.get_config().set_python_executable(python_exec_link_path)
diff --git a/flink-python/pyflink/table/tests/test_udf.py b/flink-python/pyflink/table/tests/test_udf.py
index a885118..283143b 100644
--- a/flink-python/pyflink/table/tests/test_udf.py
+++ b/flink-python/pyflink/table/tests/test_udf.py
@@ -17,8 +17,10 @@
################################################################################
import datetime
import os
+import sys
import unittest
+import pytest
import pytz
from pyflink.table import DataTypes, expressions as expr
@@ -52,8 +54,9 @@ class UserDefinedFunctionTests(object):
# check memory limit is set
@udf(result_type=DataTypes.BIGINT())
- def check_memory_limit():
- assert os.environ['_PYTHON_WORKER_MEMORY_LIMIT'] is not None
+ def check_memory_limit(exec_mode):
+ if exec_mode == "process":
+ assert os.environ['_PYTHON_WORKER_MEMORY_LIMIT'] is not None
return 1
table_sink = source_sink_utils.TestAppendSink(
@@ -62,10 +65,13 @@ class UserDefinedFunctionTests(object):
DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT()])
self.t_env.register_table_sink("Results", table_sink)
+ execution_mode = self.t_env.get_config().get_configuration().get_string(
+ "python.execution-mode", "process")
+
t = self.t_env.from_elements([(1, 2, 3), (2, 5, 6), (3, 1, 9)], ['a', 'b', 'c'])
t.where(add_one(t.b) <= 3).select(
add_one(t.a), subtract_one(t.b), add(t.a, t.c), add_one_callable(t.a),
- add_one_partial(t.a), check_memory_limit(), t.a) \
+ add_one_partial(t.a), check_memory_limit(execution_mode), t.a) \
.execute_insert("Results").wait()
actual = source_sink_utils.results()
self.assert_equals(actual, ["+I[2, 1, 4, 2, 2, 1, 1]", "+I[4, 0, 12, 4, 4, 1, 3]"])
@@ -217,7 +223,14 @@ class UserDefinedFunctionTests(object):
def test_open(self):
self.t_env.get_config().get_configuration().set_string('python.metric.enabled', 'true')
- subtract = udf(Subtract(), result_type=DataTypes.BIGINT())
+ execution_mode = self.t_env.get_config().get_configuration().get_string(
+ "python.execution-mode", None)
+
+ if execution_mode == "process":
+ subtract = udf(SubtractWithMetrics(), result_type=DataTypes.BIGINT())
+ else:
+ subtract = udf(Subtract(), result_type=DataTypes.BIGINT())
+
table_sink = source_sink_utils.TestAppendSink(
['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
self.t_env.register_table_sink("Results", table_sink)
@@ -783,6 +796,22 @@ class PyFlinkBatchUserDefinedFunctionTests(UserDefinedFunctionTests,
pass
+@pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7")
+class PyFlinkEmbeddedMultiThreadTests(UserDefinedFunctionTests, PyFlinkBatchTableTestCase):
+ def setUp(self):
+ super(PyFlinkEmbeddedMultiThreadTests, self).setUp()
+ self.t_env.get_config().get_configuration().set_string("python.execution-mode",
+ "multi-thread")
+
+
+@pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7")
+class PyFlinkEmbeddedSubInterpreterTests(UserDefinedFunctionTests, PyFlinkBatchTableTestCase):
+ def setUp(self):
+ super(PyFlinkEmbeddedSubInterpreterTests, self).setUp()
+ self.t_env.get_config().get_configuration().set_string("python.execution-mode",
+ "sub-interpreter")
+
+
# test specify the input_types
@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
def add(i, j):
@@ -795,7 +824,7 @@ class SubtractOne(ScalarFunction):
return i - 1
-class Subtract(ScalarFunction, unittest.TestCase):
+class SubtractWithMetrics(ScalarFunction, unittest.TestCase):
def open(self, function_context):
self.subtracted_value = 1
@@ -810,6 +839,18 @@ class Subtract(ScalarFunction, unittest.TestCase):
return i - self.subtracted_value
+class Subtract(ScalarFunction, unittest.TestCase):
+
+ def open(self, function_context):
+ self.subtracted_value = 1
+ self.counter_sum = 0
+
+ def eval(self, i):
+ # counter
+ self.counter_sum += i
+ return i - self.subtracted_value
+
+
class CallablePlus(object):
def __call__(self, col):
diff --git a/flink-python/setup.py b/flink-python/setup.py
index dae7e64..6bd3f97 100644
--- a/flink-python/setup.py
+++ b/flink-python/setup.py
@@ -313,6 +313,7 @@ try:
'pandas>=1.0,<1.2.0', 'pyarrow>=0.15.1,<3.0.0',
'pytz>=2018.3', 'numpy>=1.14.3,<1.20', 'fastavro>=0.21.4,<0.24',
'requests>=2.26.0', 'protobuf<3.18',
+ 'pemja==0.1.2;python_full_version >= "3.7"',
apache_flink_libraries_dependency],
cmdclass={'build_ext': build_ext},
tests_require=['pytest==4.4.1'],
diff --git a/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java b/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java
index d862167..5195f1f 100644
--- a/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java
+++ b/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java
@@ -89,6 +89,9 @@ public class PythonConfig implements Serializable {
/** Whether profile is enabled. */
private final boolean profileEnabled;
+ /** Execution Mode. */
+ private final String executionMode;
+
public PythonConfig(Configuration config) {
this.config = config;
maxBundleSize = config.get(PythonOptions.MAX_BUNDLE_SIZE);
@@ -110,6 +113,7 @@ public class PythonConfig implements Serializable {
metricEnabled = config.getBoolean(PythonOptions.PYTHON_METRIC_ENABLED);
isUsingManagedMemory = config.getBoolean(PythonOptions.USE_MANAGED_MEMORY);
profileEnabled = config.getBoolean(PythonOptions.PYTHON_PROFILE_ENABLED);
+ executionMode = config.getString(PythonOptions.PYTHON_EXECUTION_MODE);
}
public int getMaxBundleSize() {
@@ -144,6 +148,10 @@ public class PythonConfig implements Serializable {
return pythonExec;
}
+ public String getExecutionMode() {
+ return executionMode;
+ }
+
public boolean isMetricEnabled() {
return metricEnabled;
}
diff --git a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
index de68482..874a8d0 100644
--- a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
+++ b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
@@ -214,4 +214,17 @@ public class PythonOptions {
+ "in each batch when iterating a Python MapState. Note that this is an experimental flag "
+ "and might not be available "
+ "in future releases.");
+
+ /** Specify the python runtime execution mode. */
+ @Experimental
+ public static final ConfigOption<String> PYTHON_EXECUTION_MODE =
+ ConfigOptions.key("python.execution-mode")
+ .defaultValue("process")
+ .withDescription(
+ "Specify the python runtime execution mode. The optional values are `process`, `multi-thread` and `sub-interpreter`. "
+ + "The `process` mode means that the Python user-defined functions will be executed in separate Python process. "
+ + "The `multi-thread` mode means that the Python user-defined functions will be executed in the same thread as Java Operator, but it will be affected by GIL performance. "
+ + "The `sub-interpreter` mode means that the Python user-defined functions will be executed in python different sub-interpreters rather than different threads of one interpreter, "
+ + "which can largely overcome the effects of the GIL, but it maybe fail in some CPython extensions libraries, such as numpy, tensorflow. "
+ + "Note that if the python operator dose not support `multi-thread` and `sub-interpreter` mode, we will still use `process` mode.");
}
diff --git a/flink-python/src/main/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManager.java b/flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java
similarity index 74%
rename from flink-python/src/main/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManager.java
rename to flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java
index ac63769..2ab0fac 100644
--- a/flink-python/src/main/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManager.java
+++ b/flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java
@@ -16,16 +16,12 @@
* limitations under the License.
*/
-package org.apache.flink.python.env.beam;
+package org.apache.flink.python.env;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.python.env.ProcessPythonEnvironment;
-import org.apache.flink.python.env.PythonDependencyInfo;
-import org.apache.flink.python.env.PythonEnvironment;
-import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.python.util.CompressionUtils;
import org.apache.flink.python.util.PythonEnvironmentManagerUtils;
import org.apache.flink.util.FileUtils;
@@ -40,11 +36,8 @@ import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.GuardedBy;
-import java.io.DataOutputStream;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.nio.charset.Charset;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -61,17 +54,30 @@ import java.util.concurrent.locks.ReentrantLock;
import static org.apache.flink.python.util.PythonDependencyUtils.PARAM_DELIMITER;
/**
- * The ProcessPythonEnvironmentManager is used to prepare the working dir of python UDF worker and
- * create ProcessPythonEnvironment object of Beam Fn API. It's used when the python function runner
- * is configured to run python UDF in process mode.
+ * The base class of python environment manager which is used to create the PythonEnvironment object
+ * used to execute Python functions.
*/
@Internal
-public final class ProcessPythonEnvironmentManager implements PythonEnvironmentManager {
+public abstract class AbstractPythonEnvironmentManager implements PythonEnvironmentManager {
private static final Logger LOG =
- LoggerFactory.getLogger(ProcessPythonEnvironmentManager.class);
+ LoggerFactory.getLogger(AbstractPythonEnvironmentManager.class);
- @VisibleForTesting static final String PYFLINK_GATEWAY_DISABLED = "PYFLINK_GATEWAY_DISABLED";
+ private static final long CHECK_INTERVAL = 20;
+ private static final long CHECK_TIMEOUT = 1000;
+
+ private transient Thread shutdownHook;
+
+ protected transient PythonLeasedResource resource;
+
+ protected final PythonDependencyInfo dependencyInfo;
+
+ private final Map<String, String> systemEnv;
+
+ private final String[] tmpDirectories;
+ private final JobID jobID;
+
+ @VisibleForTesting public static final String PYTHON_REQUIREMENTS_DIR = "python-requirements";
@VisibleForTesting
public static final String PYTHON_REQUIREMENTS_FILE = "_PYTHON_REQUIREMENTS_FILE";
@@ -84,23 +90,14 @@ public final class ProcessPythonEnvironmentManager implements PythonEnvironmentM
@VisibleForTesting public static final String PYTHON_WORKING_DIR = "_PYTHON_WORKING_DIR";
- @VisibleForTesting static final String PYTHON_REQUIREMENTS_DIR = "python-requirements";
- @VisibleForTesting static final String PYTHON_ARCHIVES_DIR = "python-archives";
- @VisibleForTesting static final String PYTHON_FILES_DIR = "python-files";
-
- private static final long CHECK_INTERVAL = 20;
- private static final long CHECK_TIMEOUT = 1000;
-
- private transient Thread shutdownHook;
+ @VisibleForTesting public static final String PYTHON_FILES_DIR = "python-files";
- private transient PythonEnvResources.PythonLeasedResource resource;
+ @VisibleForTesting public static final String PYTHON_ARCHIVES_DIR = "python-archives";
- private final PythonDependencyInfo dependencyInfo;
- private final Map<String, String> systemEnv;
- private final String[] tmpDirectories;
- private final JobID jobID;
+ @VisibleForTesting
+ public static final String PYFLINK_GATEWAY_DISABLED = "PYFLINK_GATEWAY_DISABLED";
- public ProcessPythonEnvironmentManager(
+ public AbstractPythonEnvironmentManager(
PythonDependencyInfo dependencyInfo,
String[] tmpDirectories,
Map<String, String> systemEnv,
@@ -131,7 +128,7 @@ public final class ProcessPythonEnvironmentManager implements PythonEnvironmentM
});
shutdownHook =
ShutdownHookUtil.addShutdownHook(
- this, ProcessPythonEnvironmentManager.class.getSimpleName(), LOG);
+ this, AbstractPythonEnvironmentManager.class.getSimpleName(), LOG);
}
@Override
@@ -141,68 +138,38 @@ public final class ProcessPythonEnvironmentManager implements PythonEnvironmentM
} finally {
if (shutdownHook != null) {
ShutdownHookUtil.removeShutdownHook(
- shutdownHook, ProcessPythonEnvironmentManager.class.getSimpleName(), LOG);
+ shutdownHook, AbstractPythonEnvironmentManager.class.getSimpleName(), LOG);
shutdownHook = null;
}
}
}
- @Override
- public PythonEnvironment createEnvironment() throws Exception {
- HashMap<String, String> env = new HashMap<>(resource.env);
-
- String runnerScript =
- PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(
- dependencyInfo.getPythonExec(), env);
-
- return new ProcessPythonEnvironment(runnerScript, env);
+ @VisibleForTesting
+ public String getBaseDirectory() {
+ return resource.baseDirectory;
}
- /**
- * Returns an empty RetrievalToken because no files will be transmit via ArtifactService in
- * process mode.
- *
- * @return The path of empty RetrievalToken.
- */
- @Override
- public String createRetrievalToken() throws IOException {
- File retrievalToken =
- new File(
- resource.baseDirectory,
- "retrieval_token_" + UUID.randomUUID().toString() + ".json");
- if (retrievalToken.createNewFile()) {
- final DataOutputStream dos = new DataOutputStream(new FileOutputStream(retrievalToken));
- dos.writeBytes("{\"manifest\": {}}");
- dos.flush();
- dos.close();
- return retrievalToken.getAbsolutePath();
- } else {
- throw new IOException(
- "Could not create the RetrievalToken file: "
- + retrievalToken.getAbsolutePath());
- }
+ @VisibleForTesting
+ public Map<String, String> getPythonEnv() {
+ return resource.env;
}
/**
* Constructs the environment variables which is used to launch the python UDF worker.
*
- * <p>To avoid unnecessary IO, the artifacts will not be transmitted via the ArtifactService of
- * Beam when running in process mode. Instead, the paths of the artifacts will be passed to the
- * Python UDF worker directly.
- *
* @return The environment variables which contain the paths of the python dependencies.
*/
@VisibleForTesting
- Map<String, String> constructEnvironmentVariables(String baseDirectory)
- throws IOException, IllegalArgumentException {
+ public Map<String, String> constructEnvironmentVariables(String baseDirectory)
+ throws IOException {
Map<String, String> env = new HashMap<>(this.systemEnv);
constructFilesDirectory(env, baseDirectory);
- constructArchivesDirectory(env, baseDirectory);
-
constructRequirementsDirectory(env, baseDirectory);
+ constructArchivesDirectory(env, baseDirectory);
+
// set BOOT_LOG_DIR.
env.put("BOOT_LOG_DIR", baseDirectory);
@@ -220,8 +187,26 @@ public final class ProcessPythonEnvironmentManager implements PythonEnvironmentM
return env;
}
- @VisibleForTesting
- void installRequirements(String baseDirectory, Map<String, String> env) throws IOException {
+ private static String createBaseDirectory(String[] tmpDirectories) throws IOException {
+ Random rnd = new Random();
+ // try to find a unique file name for the base directory
+ int maxAttempts = 10;
+ for (int attempt = 0; attempt < maxAttempts; attempt++) {
+ String directory = tmpDirectories[rnd.nextInt(tmpDirectories.length)];
+ File baseDirectory = new File(directory, "python-dist-" + UUID.randomUUID().toString());
+ if (baseDirectory.mkdirs()) {
+ return baseDirectory.getAbsolutePath();
+ }
+ }
+
+ throw new IOException(
+ "Could not find a unique directory name in '"
+ + Arrays.toString(tmpDirectories)
+ + "' for storing the generated files of python dependency.");
+ }
+
+ private void installRequirements(String baseDirectory, Map<String, String> env)
+ throws IOException {
// Directory for storing the installation result of the requirements file.
String requirementsDirectory =
String.join(File.separator, baseDirectory, PYTHON_REQUIREMENTS_DIR);
@@ -236,10 +221,6 @@ public final class ProcessPythonEnvironmentManager implements PythonEnvironmentM
}
}
- public void setEnvironmentVariable(String key, String value) {
- this.systemEnv.put(key, value);
- }
-
private void constructFilesDirectory(Map<String, String> env, String baseDirectory)
throws IOException {
// link or copy python files to filesDirectory and add them to PYTHONPATH
@@ -316,44 +297,8 @@ public final class ProcessPythonEnvironmentManager implements PythonEnvironmentM
LOG.info("PYTHONPATH of python worker: {}", env.get("PYTHONPATH"));
}
- private void constructArchivesDirectory(Map<String, String> env, String baseDirectory)
- throws IOException {
- // Directory for storing the extracted result of the archive files.
- String archivesDirectory = String.join(File.separator, baseDirectory, PYTHON_ARCHIVES_DIR);
-
- if (!dependencyInfo.getArchives().isEmpty()) {
- // set the archives directory as the working directory, then user could access the
- // content of the archives via relative path
- env.put(PYTHON_WORKING_DIR, archivesDirectory);
- LOG.info("Python working dir of python worker: {}", archivesDirectory);
-
- // extract archives to archives directory
- for (Map.Entry<String, String> entry : dependencyInfo.getArchives().entrySet()) {
- String srcFilePath = entry.getKey();
-
- String originalFileName;
- String targetDirName;
- if (entry.getValue().contains(PARAM_DELIMITER)) {
- String[] filePathAndTargetDir = entry.getValue().split(PARAM_DELIMITER, 2);
- originalFileName = filePathAndTargetDir[0];
- targetDirName = filePathAndTargetDir[1];
- } else {
- originalFileName = entry.getValue();
- targetDirName = originalFileName;
- }
-
- String targetDirPath =
- String.join(File.separator, archivesDirectory, targetDirName);
- CompressionUtils.extractFile(srcFilePath, targetDirPath, originalFileName);
- }
- }
- }
-
private void constructRequirementsDirectory(Map<String, String> env, String baseDirectory)
throws IOException {
- // set the requirements file and the dependencies specified by the requirements file will be
- // installed in
- // boot.py during initialization
String requirementsDirectory =
String.join(File.separator, baseDirectory, PYTHON_REQUIREMENTS_DIR);
if (dependencyInfo.getRequirementsFilePath().isPresent()) {
@@ -377,34 +322,42 @@ public final class ProcessPythonEnvironmentManager implements PythonEnvironmentM
dependencyInfo.getRequirementsCacheDir().get());
}
- // the dependencies specified by the requirements file will be installed into this
- // directory, and will be
- // added to PYTHONPATH in boot.py
env.put(PYTHON_REQUIREMENTS_INSTALL_DIR, requirementsDirectory);
LOG.info("Requirements install directory of python worker: {}", requirementsDirectory);
}
}
- @VisibleForTesting
- String getBaseDirectory() {
- return resource.baseDirectory;
- }
+ private void constructArchivesDirectory(Map<String, String> env, String baseDirectory)
+ throws IOException {
+ // Directory for storing the extracted result of the archive files.
+ String archivesDirectory = String.join(File.separator, baseDirectory, PYTHON_ARCHIVES_DIR);
- @VisibleForTesting
- Map<String, String> getPythonEnv() {
- return resource.env;
- }
+ if (!dependencyInfo.getArchives().isEmpty()) {
+ // set the archives directory as the working directory, then user could access the
+ // content of the archives via relative path
+ env.put(PYTHON_WORKING_DIR, archivesDirectory);
+ LOG.info("Python working dir of python worker: {}", archivesDirectory);
- @Override
- public String getBootLog() throws Exception {
- File bootLogFile =
- new File(resource.baseDirectory + File.separator + "flink-python-udf-boot.log");
- String msg = "Failed to create stage bundle factory!";
- if (bootLogFile.exists()) {
- byte[] output = Files.readAllBytes(bootLogFile.toPath());
- msg += String.format(" %s", new String(output, Charset.defaultCharset()));
+ // extract archives to archives directory
+ for (Map.Entry<String, String> entry : dependencyInfo.getArchives().entrySet()) {
+ String srcFilePath = entry.getKey();
+
+ String originalFileName;
+ String targetDirName;
+ if (entry.getValue().contains(PARAM_DELIMITER)) {
+ String[] filePathAndTargetDir = entry.getValue().split(PARAM_DELIMITER, 2);
+ originalFileName = filePathAndTargetDir[0];
+ targetDirName = filePathAndTargetDir[1];
+ } else {
+ originalFileName = entry.getValue();
+ targetDirName = originalFileName;
+ }
+
+ String targetDirPath =
+ String.join(File.separator, archivesDirectory, targetDirName);
+ CompressionUtils.extractFile(srcFilePath, targetDirPath, originalFileName);
+ }
}
- return msg;
}
private static void appendToPythonPath(
@@ -424,24 +377,6 @@ public final class ProcessPythonEnvironmentManager implements PythonEnvironmentM
}
}
- private static String createBaseDirectory(String[] tmpDirectories) throws IOException {
- Random rnd = new Random();
- // try to find a unique file name for the base directory
- int maxAttempts = 10;
- for (int attempt = 0; attempt < maxAttempts; attempt++) {
- String directory = tmpDirectories[rnd.nextInt(tmpDirectories.length)];
- File baseDirectory = new File(directory, "python-dist-" + UUID.randomUUID().toString());
- if (baseDirectory.mkdirs()) {
- return baseDirectory.getAbsolutePath();
- }
- }
-
- throw new IOException(
- "Could not find a unique directory name in '"
- + Arrays.toString(tmpDirectories)
- + "' for storing the generated files of python dependency.");
- }
-
private static final class PythonEnvResources {
private static final ReentrantLock lock = new ReentrantLock();
@@ -502,53 +437,57 @@ public final class ProcessPythonEnvironmentManager implements PythonEnvironmentM
Map<String, String> env = resource.f1;
return new PythonLeasedResource(baseDirectory, env);
}
+ }
- private static final class PythonLeasedResource implements AutoCloseable {
- private final Map<String, String> env;
+ /**
+ * Python lease resource which includes environment variables and working directory of execution
+ * python environment.
+ */
+ public static final class PythonLeasedResource implements AutoCloseable {
+ public final Map<String, String> env;
- /** The base directory of the Python Environment. */
- private final String baseDirectory;
+ /** The base directory of the Python Environment. */
+ public final String baseDirectory;
- /** Keep track of the number of threads sharing this Python environment resources. */
- private long refCount = 0;
+ /** Keep track of the number of threads sharing this Python environment resources. */
+ private long refCount = 0;
- PythonLeasedResource(String baseDirectory, Map<String, String> env) {
- this.baseDirectory = baseDirectory;
- this.env = env;
- }
+ PythonLeasedResource(String baseDirectory, Map<String, String> env) {
+ this.baseDirectory = baseDirectory;
+ this.env = env;
+ }
- void incRef() {
- this.refCount += 1;
- }
+ void incRef() {
+ this.refCount += 1;
+ }
- void decRef() {
- Preconditions.checkState(refCount > 0);
- this.refCount -= 1;
- }
+ void decRef() {
+ Preconditions.checkState(refCount > 0);
+ this.refCount -= 1;
+ }
- @Override
- public void close() throws Exception {
- int retries = 0;
- while (true) {
- try {
- FileUtils.deleteDirectory(new File(baseDirectory));
+ @Override
+ public void close() throws Exception {
+ int retries = 0;
+ while (true) {
+ try {
+ FileUtils.deleteDirectory(new File(baseDirectory));
+ break;
+ } catch (Throwable t) {
+ retries++;
+ if (retries <= CHECK_TIMEOUT / CHECK_INTERVAL) {
+ LOG.warn(
+ String.format(
+ "Failed to delete the working directory %s of the Python UDF worker. Retrying...",
+ baseDirectory),
+ t);
+ } else {
+ LOG.warn(
+ String.format(
+ "Failed to delete the working directory %s of the Python UDF worker.",
+ baseDirectory),
+ t);
break;
- } catch (Throwable t) {
- retries++;
- if (retries <= CHECK_TIMEOUT / CHECK_INTERVAL) {
- LOG.warn(
- String.format(
- "Failed to delete the working directory %s of the Python UDF worker. Retrying...",
- baseDirectory),
- t);
- } else {
- LOG.warn(
- String.format(
- "Failed to delete the working directory %s of the Python UDF worker.",
- baseDirectory),
- t);
- break;
- }
}
}
}
diff --git a/flink-python/src/main/java/org/apache/flink/python/env/PythonDependencyInfo.java b/flink-python/src/main/java/org/apache/flink/python/env/PythonDependencyInfo.java
index b44ae2c..4ef6852 100644
--- a/flink-python/src/main/java/org/apache/flink/python/env/PythonDependencyInfo.java
+++ b/flink-python/src/main/java/org/apache/flink/python/env/PythonDependencyInfo.java
@@ -21,6 +21,7 @@ package org.apache.flink.python.env;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.python.PythonConfig;
+import org.apache.flink.python.PythonOptions;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -69,17 +70,37 @@ public final class PythonDependencyInfo {
*/
@Nonnull private final String pythonExec;
+ /** Execution Mode. */
+ @Nonnull private final String executionMode;
+
public PythonDependencyInfo(
@Nonnull Map<String, String> pythonFiles,
@Nullable String requirementsFilePath,
@Nullable String requirementsCacheDir,
@Nonnull Map<String, String> archives,
@Nonnull String pythonExec) {
+ this(
+ pythonFiles,
+ requirementsFilePath,
+ requirementsCacheDir,
+ archives,
+ pythonExec,
+ PythonOptions.PYTHON_EXECUTION_MODE.defaultValue());
+ }
+
+ public PythonDependencyInfo(
+ @Nonnull Map<String, String> pythonFiles,
+ @Nullable String requirementsFilePath,
+ @Nullable String requirementsCacheDir,
+ @Nonnull Map<String, String> archives,
+ @Nonnull String pythonExec,
+ @Nonnull String executionMode) {
this.pythonFiles = Objects.requireNonNull(pythonFiles);
this.requirementsFilePath = requirementsFilePath;
this.requirementsCacheDir = requirementsCacheDir;
this.pythonExec = Objects.requireNonNull(pythonExec);
this.archives = Objects.requireNonNull(archives);
+ this.executionMode = Objects.requireNonNull(executionMode);
}
public Map<String, String> getPythonFiles() {
@@ -102,6 +123,10 @@ public final class PythonDependencyInfo {
return archives;
}
+ public String getExecutionMode() {
+ return executionMode;
+ }
+
/**
* Creates PythonDependencyInfo from GlobalJobParameters and DistributedCache.
*
@@ -144,6 +169,11 @@ public final class PythonDependencyInfo {
String pythonExec = pythonConfig.getPythonExec();
return new PythonDependencyInfo(
- pythonFiles, requirementsFilePath, requirementsCacheDir, archives, pythonExec);
+ pythonFiles,
+ requirementsFilePath,
+ requirementsCacheDir,
+ archives,
+ pythonExec,
+ pythonConfig.getExecutionMode());
}
}
diff --git a/flink-python/src/main/java/org/apache/flink/python/env/PythonEnvironmentManager.java b/flink-python/src/main/java/org/apache/flink/python/env/PythonEnvironmentManager.java
index ddd8560..b317edb 100644
--- a/flink-python/src/main/java/org/apache/flink/python/env/PythonEnvironmentManager.java
+++ b/flink-python/src/main/java/org/apache/flink/python/env/PythonEnvironmentManager.java
@@ -19,11 +19,10 @@
package org.apache.flink.python.env;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.python.PythonFunctionRunner;
/**
* The base interface of python environment manager which is used to create the PythonEnvironment
- * object and the RetrievalToken.
+ * object used to execute Python functions.
*/
@Internal
public interface PythonEnvironmentManager extends AutoCloseable {
@@ -32,21 +31,10 @@ public interface PythonEnvironmentManager extends AutoCloseable {
void open() throws Exception;
/**
- * Creates the PythonEnvironment object used in {@link PythonFunctionRunner}.
+ * Creates the PythonEnvironment object used to execute Python functions.
*
- * @return The PythonEnvironment object which represents the environment(process, docker, etc)
- * the python worker would run in.
+ * @return The PythonEnvironment object which represents the environment(embedded thread,
+ * process, docker, etc) the python worker would run in.
*/
PythonEnvironment createEnvironment() throws Exception;
-
- /**
- * Creates the RetrievalToken used in {@link PythonFunctionRunner}. It contains a list of files
- * which need to transmit through ArtifactService provided by {@link PythonFunctionRunner}.
- *
- * @return The path of the RetrievalToken file.
- */
- String createRetrievalToken() throws Exception;
-
- /** Returns the boot log of the Python Environment. */
- String getBootLog() throws Exception;
}
diff --git a/flink-python/src/main/java/org/apache/flink/python/env/ProcessPythonEnvironment.java b/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironment.java
similarity index 65%
copy from flink-python/src/main/java/org/apache/flink/python/env/ProcessPythonEnvironment.java
copy to flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironment.java
index 4aa8099..bfb020a 100644
--- a/flink-python/src/main/java/org/apache/flink/python/env/ProcessPythonEnvironment.java
+++ b/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironment.java
@@ -16,28 +16,31 @@
* limitations under the License.
*/
-package org.apache.flink.python.env;
+package org.apache.flink.python.env.embedded;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.python.env.PythonEnvironment;
+
+import pemja.core.PythonInterpreterConfig;
import java.util.Map;
-/** A {@link PythonEnvironment} for executing UDFs in Process. */
+/** A {@link PythonEnvironment} for executing UDFs in embedded environment. */
@Internal
-public class ProcessPythonEnvironment implements PythonEnvironment {
- private final String command;
+public class EmbeddedPythonEnvironment implements PythonEnvironment {
+ private final PythonInterpreterConfig config;
private final Map<String, String> env;
- public ProcessPythonEnvironment(String command, Map<String, String> env) {
- this.command = command;
+ public EmbeddedPythonEnvironment(PythonInterpreterConfig config, Map<String, String> env) {
+ this.config = config;
this.env = env;
}
- public Map<String, String> getEnv() {
- return env;
+ public PythonInterpreterConfig getConfig() {
+ return config;
}
- public String getCommand() {
- return command;
+ public Map<String, String> getEnv() {
+ return env;
}
}
diff --git a/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironmentManager.java b/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironmentManager.java
new file mode 100644
index 0000000..add296a
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironmentManager.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.env.embedded;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.python.env.AbstractPythonEnvironmentManager;
+import org.apache.flink.python.env.PythonDependencyInfo;
+import org.apache.flink.python.env.PythonEnvironment;
+
+import pemja.core.PythonInterpreterConfig;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The base class of python environment manager which is used to create the PythonEnvironment
+ * object. It's used to run python UDF in embedded Python environment.
+ */
+@Internal
+public class EmbeddedPythonEnvironmentManager extends AbstractPythonEnvironmentManager {
+
+ public EmbeddedPythonEnvironmentManager(
+ PythonDependencyInfo dependencyInfo,
+ String[] tmpDirectories,
+ Map<String, String> systemEnv,
+ JobID jobID) {
+ super(dependencyInfo, tmpDirectories, systemEnv, jobID);
+ }
+
+ @Override
+ public PythonEnvironment createEnvironment() throws Exception {
+ Map<String, String> env = new HashMap<>(getPythonEnv());
+
+ PythonInterpreterConfig.ExecType execType;
+
+ String executionMode = dependencyInfo.getExecutionMode();
+
+ if (executionMode.equalsIgnoreCase("sub-interpreter")) {
+ execType = PythonInterpreterConfig.ExecType.SUB_INTERPRETER;
+ } else if (executionMode.equalsIgnoreCase("multi-thread")) {
+ execType = PythonInterpreterConfig.ExecType.MULTI_THREAD;
+ } else {
+ throw new RuntimeException(
+ String.format("Unsupported execution mode %s.", executionMode));
+ }
+
+ if (env.containsKey("FLINK_TESTING")) {
+ String flinkHome = env.get("FLINK_HOME");
+ String sourceRootDir = new File(flinkHome, "../../../../").getCanonicalPath();
+ String flinkPython = sourceRootDir + "/flink-python";
+ // add flink-python of source code to PYTHONPATH
+ env.put(
+ "PYTHONPATH",
+ flinkPython + File.pathSeparator + env.getOrDefault("PYTHONPATH", ""));
+ }
+
+ PythonInterpreterConfig interpreterConfig =
+ PythonInterpreterConfig.newBuilder()
+ .setPythonExec(dependencyInfo.getPythonExec())
+ .setExcType(execType)
+ .addPythonPaths(env.getOrDefault("PYTHONPATH", ""))
+ .build();
+
+ return new EmbeddedPythonEnvironment(interpreterConfig, env);
+ }
+}
diff --git a/flink-python/src/main/java/org/apache/flink/python/env/ProcessPythonEnvironment.java b/flink-python/src/main/java/org/apache/flink/python/env/process/ProcessPythonEnvironment.java
similarity index 93%
rename from flink-python/src/main/java/org/apache/flink/python/env/ProcessPythonEnvironment.java
rename to flink-python/src/main/java/org/apache/flink/python/env/process/ProcessPythonEnvironment.java
index 4aa8099..86c13f9 100644
--- a/flink-python/src/main/java/org/apache/flink/python/env/ProcessPythonEnvironment.java
+++ b/flink-python/src/main/java/org/apache/flink/python/env/process/ProcessPythonEnvironment.java
@@ -16,9 +16,10 @@
* limitations under the License.
*/
-package org.apache.flink.python.env;
+package org.apache.flink.python.env.process;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.python.env.PythonEnvironment;
import java.util.Map;
diff --git a/flink-python/src/main/java/org/apache/flink/python/env/process/ProcessPythonEnvironmentManager.java b/flink-python/src/main/java/org/apache/flink/python/env/process/ProcessPythonEnvironmentManager.java
new file mode 100644
index 0000000..b316e06
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/python/env/process/ProcessPythonEnvironmentManager.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.env.process;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.python.env.AbstractPythonEnvironmentManager;
+import org.apache.flink.python.env.PythonDependencyInfo;
+import org.apache.flink.python.env.PythonEnvironment;
+import org.apache.flink.python.util.PythonEnvironmentManagerUtils;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * The ProcessPythonEnvironmentManager is used to prepare the working dir of python UDF worker and
+ * create ProcessPythonEnvironment object of Beam Fn API. It's used when the python function runner
+ * is configured to run python UDF in process mode.
+ */
+@Internal
+public final class ProcessPythonEnvironmentManager extends AbstractPythonEnvironmentManager {
+
+ public ProcessPythonEnvironmentManager(
+ PythonDependencyInfo dependencyInfo,
+ String[] tmpDirectories,
+ Map<String, String> systemEnv,
+ JobID jobID) {
+ super(dependencyInfo, tmpDirectories, systemEnv, jobID);
+ }
+
+ @Override
+ public PythonEnvironment createEnvironment() throws Exception {
+ HashMap<String, String> env = new HashMap<>(resource.env);
+
+ String runnerScript =
+ PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(
+ dependencyInfo.getPythonExec(), env);
+
+ return new ProcessPythonEnvironment(runnerScript, env);
+ }
+
+ /**
+ * Returns an empty RetrievalToken because no files will be transmit via ArtifactService in
+ * process mode.
+ *
+ * @return The path of empty RetrievalToken.
+ */
+ public String createRetrievalToken() throws IOException {
+ File retrievalToken =
+ new File(
+ resource.baseDirectory,
+ "retrieval_token_" + UUID.randomUUID().toString() + ".json");
+ if (retrievalToken.createNewFile()) {
+ final DataOutputStream dos = new DataOutputStream(new FileOutputStream(retrievalToken));
+ dos.writeBytes("{\"manifest\": {}}");
+ dos.flush();
+ dos.close();
+ return retrievalToken.getAbsolutePath();
+ } else {
+ throw new IOException(
+ "Could not create the RetrievalToken file: "
+ + retrievalToken.getAbsolutePath());
+ }
+ }
+
+ /** Returns the boot log of the Python Environment. */
+ public String getBootLog() throws Exception {
+ File bootLogFile =
+ new File(resource.baseDirectory + File.separator + "flink-python-udf-boot.log");
+ String msg = "Failed to create stage bundle factory!";
+ if (bootLogFile.exists()) {
+ byte[] output = Files.readAllBytes(bootLogFile.toPath());
+ msg += String.format(" %s", new String(output, Charset.defaultCharset()));
+ }
+ return msg;
+ }
+}
diff --git a/flink-python/src/main/java/org/apache/flink/python/util/PythonEnvironmentManagerUtils.java b/flink-python/src/main/java/org/apache/flink/python/util/PythonEnvironmentManagerUtils.java
index 9da7140..54845eb 100644
--- a/flink-python/src/main/java/org/apache/flink/python/util/PythonEnvironmentManagerUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/python/util/PythonEnvironmentManagerUtils.java
@@ -37,7 +37,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import static org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.PYTHON_WORKING_DIR;
+import static org.apache.flink.python.env.process.ProcessPythonEnvironmentManager.PYTHON_WORKING_DIR;
/** Utils used to prepare the python environment. */
@Internal
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractDataStreamPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractDataStreamPythonFunctionOperator.java
index b8feff9..6685abb 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractDataStreamPythonFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractDataStreamPythonFunctionOperator.java
@@ -34,7 +34,7 @@ import java.util.Map;
/** Base class for all Python DataStream operators. */
@Internal
public abstract class AbstractDataStreamPythonFunctionOperator<OUT>
- extends AbstractPythonFunctionOperator<OUT> implements ResultTypeQueryable<OUT> {
+ extends AbstractExternalPythonFunctionOperator<OUT> implements ResultTypeQueryable<OUT> {
private static final long serialVersionUID = 1L;
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java
new file mode 100644
index 0000000..f23e3d2
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.python.PythonConfig;
+import org.apache.flink.python.env.PythonDependencyInfo;
+import org.apache.flink.python.env.embedded.EmbeddedPythonEnvironment;
+import org.apache.flink.python.env.embedded.EmbeddedPythonEnvironmentManager;
+import org.apache.flink.table.functions.python.PythonEnv;
+
+import pemja.core.PythonInterpreter;
+import pemja.core.PythonInterpreterConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.python.env.AbstractPythonEnvironmentManager.PYTHON_WORKING_DIR;
+
+/**
+ * Abstract class for all stream operators to execute Python functions in embedded Python
+ * environment.
+ */
+@Internal
+public abstract class AbstractEmbeddedPythonFunctionOperator<OUT>
+ extends AbstractPythonFunctionOperator<OUT> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static ReentrantLock lock = new ReentrantLock();
+
+ private static Map<JobID, Tuple2<String, Integer>> workingDirectories = new HashMap<>();
+
+ /** The python config. */
+ protected transient PythonConfig pythonConfig;
+
+ /** Every operator will hold the only python interpreter. */
+ protected transient PythonInterpreter interpreter;
+
+ private transient EmbeddedPythonEnvironmentManager pythonEnvironmentManager;
+
+ public AbstractEmbeddedPythonFunctionOperator(Configuration config) {
+ super(config);
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ pythonConfig = new PythonConfig(config);
+ pythonEnvironmentManager = createPythonEnvironmentManager();
+ pythonEnvironmentManager.open();
+ EmbeddedPythonEnvironment environment =
+ (EmbeddedPythonEnvironment) pythonEnvironmentManager.createEnvironment();
+
+ PythonInterpreterConfig interpreterConfig = environment.getConfig();
+ interpreter = new PythonInterpreter(interpreterConfig);
+ Map<String, String> env = environment.getEnv();
+
+ if (env.containsKey(PYTHON_WORKING_DIR)) {
+ lock.lockInterruptibly();
+
+ try {
+ JobID jobId = getRuntimeContext().getJobId();
+ Tuple2<String, Integer> dirAndNums;
+
+ if (workingDirectories.containsKey(jobId)) {
+ dirAndNums = workingDirectories.get(jobId);
+ } else {
+ dirAndNums = Tuple2.of(null, 0);
+ workingDirectories.put(jobId, dirAndNums);
+ }
+ dirAndNums.f1 += 1;
+
+ if (dirAndNums.f0 == null) {
+ // get current directory.
+ interpreter.exec("import os;cwd = os.getcwd();");
+ dirAndNums.f0 = interpreter.get("cwd", String.class);
+ String workingDirectory = env.get(PYTHON_WORKING_DIR);
+ // set working directory
+ interpreter.exec(String.format("import os;os.chdir('%s')", workingDirectory));
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ openPythonInterpreter(pythonConfig.getPythonExec(), env, interpreterConfig.getExecType());
+ }
+
+ @Override
+ public void close() throws Exception {
+ try {
+ JobID jobId = getRuntimeContext().getJobId();
+ if (workingDirectories.containsKey(jobId)) {
+ lock.lockInterruptibly();
+
+ try {
+ Tuple2<String, Integer> dirAndNums = workingDirectories.get(jobId);
+ dirAndNums.f1 -= 1;
+ if (dirAndNums.f1 == 0) {
+ // change to previous working directory.
+ interpreter.exec(String.format("import os;os.chdir('%s')", dirAndNums.f0));
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ if (interpreter != null) {
+ interpreter.close();
+ }
+
+ if (pythonEnvironmentManager != null) {
+ pythonEnvironmentManager.close();
+ }
+ } finally {
+ super.close();
+ }
+ }
+
+ @Override
+ protected EmbeddedPythonEnvironmentManager createPythonEnvironmentManager() {
+ PythonDependencyInfo dependencyInfo =
+ PythonDependencyInfo.create(
+ pythonConfig, getRuntimeContext().getDistributedCache());
+ return new EmbeddedPythonEnvironmentManager(
+ dependencyInfo,
+ getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories(),
+ new HashMap<>(System.getenv()),
+ getRuntimeContext().getJobId());
+ }
+
+ /** Setup method for Python Interpreter. It can be used for initialization work. */
+ public abstract void openPythonInterpreter(
+ String pythonExecutable,
+ Map<String, String> env,
+ PythonInterpreterConfig.ExecType execType)
+ throws Exception;
+
+ /** Returns the {@link PythonEnv} used to create PythonEnvironmentManager. */
+ public abstract PythonEnv getPythonEnv();
+
+ /** Gets the proto representation of the Python user-defined functions to be executed. */
+ public abstract FlinkFnApi.UserDefinedFunctions getUserDefinedFunctionsProto();
+}
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractExternalPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractExternalPythonFunctionOperator.java
new file mode 100644
index 0000000..702e42e
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractExternalPythonFunctionOperator.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.env.PythonDependencyInfo;
+import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
+import org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner;
+import org.apache.flink.table.functions.python.PythonEnv;
+
+import java.util.HashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/** Abstract class for all stream operators to execute Python functions in external environment. */
+@Internal
+public abstract class AbstractExternalPythonFunctionOperator<OUT>
+ extends AbstractPythonFunctionOperator<OUT> {
+ /**
+ * The {@link PythonFunctionRunner} which is responsible for Python user-defined function
+ * execution.
+ */
+ protected transient PythonFunctionRunner pythonFunctionRunner;
+
+ private transient ExecutorService flushThreadPool;
+
+ public AbstractExternalPythonFunctionOperator(Configuration config) {
+ super(config);
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ this.pythonFunctionRunner = createPythonFunctionRunner();
+ this.pythonFunctionRunner.open(pythonConfig);
+ this.flushThreadPool = Executors.newSingleThreadExecutor();
+ }
+
+ @Override
+ public void close() throws Exception {
+ try {
+ if (pythonFunctionRunner != null) {
+ pythonFunctionRunner.close();
+ pythonFunctionRunner = null;
+ }
+
+ if (flushThreadPool != null) {
+ flushThreadPool.shutdown();
+ flushThreadPool = null;
+ }
+ } finally {
+ super.close();
+ }
+ }
+
+ @Override
+ protected void invokeFinishBundle() throws Exception {
+ if (elementCount > 0) {
+ AtomicBoolean flushThreadFinish = new AtomicBoolean(false);
+ AtomicReference<Throwable> exceptionReference = new AtomicReference<>();
+ flushThreadPool.submit(
+ () -> {
+ try {
+ pythonFunctionRunner.flush();
+ } catch (Throwable e) {
+ exceptionReference.set(e);
+ } finally {
+ flushThreadFinish.set(true);
+ // interrupt the progress of takeResult to avoid the main thread is
+ // blocked forever.
+ ((BeamPythonFunctionRunner) pythonFunctionRunner).notifyNoMoreResults();
+ }
+ });
+ Tuple2<byte[], Integer> resultTuple;
+ while (!flushThreadFinish.get()) {
+ resultTuple = pythonFunctionRunner.takeResult();
+ if (resultTuple.f1 != 0) {
+ emitResult(resultTuple);
+ emitResults();
+ }
+ }
+ emitResults();
+ Throwable flushThreadThrowable = exceptionReference.get();
+ if (flushThreadThrowable != null) {
+ throw new RuntimeException(
+ "Error while waiting for BeamPythonFunctionRunner flush",
+ flushThreadThrowable);
+ }
+ elementCount = 0;
+ lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime();
+ // callback only after current bundle was fully finalized
+ if (bundleFinishedCallback != null) {
+ bundleFinishedCallback.run();
+ bundleFinishedCallback = null;
+ }
+ }
+ }
+
+ @Override
+ protected ProcessPythonEnvironmentManager createPythonEnvironmentManager() {
+ PythonDependencyInfo dependencyInfo =
+ PythonDependencyInfo.create(
+ pythonConfig, getRuntimeContext().getDistributedCache());
+ PythonEnv pythonEnv = getPythonEnv();
+ if (pythonEnv.getExecType() == PythonEnv.ExecType.PROCESS) {
+ return new ProcessPythonEnvironmentManager(
+ dependencyInfo,
+ getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories(),
+ new HashMap<>(System.getenv()),
+ getRuntimeContext().getJobId());
+ } else {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Execution type '%s' is not supported.", pythonEnv.getExecType()));
+ }
+ }
+
+ protected void emitResults() throws Exception {
+ Tuple2<byte[], Integer> resultTuple;
+ while ((resultTuple = pythonFunctionRunner.pollResult()) != null && resultTuple.f1 != 0) {
+ emitResult(resultTuple);
+ }
+ }
+
+ /** Sends the execution result to the downstream operator. */
+ public abstract void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception;
+
+ /**
+ * Creates the {@link PythonFunctionRunner} which is responsible for Python user-defined
+ * function execution.
+ */
+ public abstract PythonFunctionRunner createPythonFunctionRunner() throws Exception;
+}
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
index d321370..aa1dfe1 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
@@ -19,36 +19,26 @@
package org.apache.flink.streaming.api.operators.python;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.PythonConfig;
-import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.python.PythonOptions;
-import org.apache.flink.python.env.PythonDependencyInfo;
import org.apache.flink.python.env.PythonEnvironmentManager;
-import org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionKeyedStateBackend;
-import org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.functions.python.PythonEnv;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.WrappingRuntimeException;
import java.lang.reflect.Field;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import static org.apache.flink.streaming.api.utils.ClassLeakCleaner.cleanUpLeakingClasses;
import static org.apache.flink.streaming.api.utils.PythonOperatorUtils.inBatchExecutionMode;
@@ -61,12 +51,6 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream
protected Configuration config;
- /**
- * The {@link PythonFunctionRunner} which is responsible for Python user-defined function
- * execution.
- */
- protected transient PythonFunctionRunner pythonFunctionRunner;
-
/** Max number of elements to include in a bundle. */
protected transient int maxBundleSize;
@@ -83,15 +67,13 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream
private transient long maxBundleTimeMills;
/** Time that the last bundle was finished. */
- private transient long lastFinishBundleTime;
+ protected transient long lastFinishBundleTime;
/** A timer that finishes the current bundle after a fixed amount of time. */
private transient ScheduledFuture<?> checkFinishBundleTimer;
/** Callback to be executed after the current bundle was finished. */
- private transient Runnable bundleFinishedCallback;
-
- private transient ExecutorService flushThreadPool;
+ protected transient Runnable bundleFinishedCallback;
public AbstractPythonFunctionOperator(Configuration config) {
this.config = Preconditions.checkNotNull(config);
@@ -127,9 +109,6 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream
this.maxBundleTimeMills);
}
- this.pythonFunctionRunner = createPythonFunctionRunner();
- this.pythonFunctionRunner.open(pythonConfig);
-
this.elementCount = 0;
this.lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime();
@@ -143,7 +122,6 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream
timestamp -> checkInvokeFinishBundleByTime(),
bundleCheckPeriod,
bundleCheckPeriod);
- this.flushThreadPool = Executors.newSingleThreadExecutor();
} finally {
super.open();
}
@@ -165,14 +143,6 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream
checkFinishBundleTimer.cancel(true);
checkFinishBundleTimer = null;
}
- if (pythonFunctionRunner != null) {
- pythonFunctionRunner.close();
- pythonFunctionRunner = null;
- }
- if (flushThreadPool != null) {
- flushThreadPool.shutdown();
- flushThreadPool = null;
- }
} finally {
super.close();
@@ -296,24 +266,12 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream
return config;
}
- /**
- * Creates the {@link PythonFunctionRunner} which is responsible for Python user-defined
- * function execution.
- */
- public abstract PythonFunctionRunner createPythonFunctionRunner() throws Exception;
-
/** Returns the {@link PythonEnv} used to create PythonEnvironmentManager.. */
public abstract PythonEnv getPythonEnv();
- /** Sends the execution result to the downstream operator. */
- public abstract void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception;
+ protected abstract void invokeFinishBundle() throws Exception;
- protected void emitResults() throws Exception {
- Tuple2<byte[], Integer> resultTuple;
- while ((resultTuple = pythonFunctionRunner.pollResult()) != null && resultTuple.f1 != 0) {
- emitResult(resultTuple);
- }
- }
+ protected abstract PythonEnvironmentManager createPythonEnvironmentManager();
/** Checks whether to invoke finishBundle by elements count. Called in processElement. */
protected void checkInvokeFinishBundleByCount() throws Exception {
@@ -330,66 +288,6 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream
}
}
- protected void invokeFinishBundle() throws Exception {
- if (elementCount > 0) {
- AtomicBoolean flushThreadFinish = new AtomicBoolean(false);
- AtomicReference<Throwable> exceptionReference = new AtomicReference<>();
- flushThreadPool.submit(
- () -> {
- try {
- pythonFunctionRunner.flush();
- } catch (Throwable e) {
- exceptionReference.set(e);
- } finally {
- flushThreadFinish.set(true);
- // interrupt the progress of takeResult to avoid the main thread is
- // blocked forever.
- ((BeamPythonFunctionRunner) pythonFunctionRunner).notifyNoMoreResults();
- }
- });
- Tuple2<byte[], Integer> resultTuple;
- while (!flushThreadFinish.get()) {
- resultTuple = pythonFunctionRunner.takeResult();
- if (resultTuple.f1 != 0) {
- emitResult(resultTuple);
- emitResults();
- }
- }
- emitResults();
- Throwable flushThreadThrowable = exceptionReference.get();
- if (flushThreadThrowable != null) {
- throw new RuntimeException(
- "Error while waiting for BeamPythonFunctionRunner flush",
- flushThreadThrowable);
- }
- elementCount = 0;
- lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime();
- // callback only after current bundle was fully finalized
- if (bundleFinishedCallback != null) {
- bundleFinishedCallback.run();
- bundleFinishedCallback = null;
- }
- }
- }
-
- protected PythonEnvironmentManager createPythonEnvironmentManager() {
- PythonDependencyInfo dependencyInfo =
- PythonDependencyInfo.create(
- pythonConfig, getRuntimeContext().getDistributedCache());
- PythonEnv pythonEnv = getPythonEnv();
- if (pythonEnv.getExecType() == PythonEnv.ExecType.PROCESS) {
- return new ProcessPythonEnvironmentManager(
- dependencyInfo,
- getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories(),
- new HashMap<>(System.getenv()),
- getRuntimeContext().getJobId());
- } else {
- throw new UnsupportedOperationException(
- String.format(
- "Execution type '%s' is not supported.", pythonEnv.getExecType()));
- }
- }
-
protected FlinkMetricContainer getFlinkMetricContainer() {
return this.pythonConfig.isMetricEnabled()
? new FlinkMetricContainer(getRuntimeContext().getMetricGroup())
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java
index 7347498..a76871f 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.api.runners.python.beam;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
-import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.state.KeyedStateBackend;
@@ -68,7 +68,7 @@ public class BeamDataStreamPythonFunctionRunner extends BeamPythonFunctionRunner
public BeamDataStreamPythonFunctionRunner(
String taskName,
- PythonEnvironmentManager environmentManager,
+ ProcessPythonEnvironmentManager environmentManager,
String headOperatorFunctionUrn,
List<FlinkFnApi.UserDefinedDataStreamFunction> userDefinedDataStreamFunctions,
Map<String, String> jobOptions,
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
index 0498ea9..86264ab 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
@@ -26,9 +26,9 @@ import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.PythonConfig;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.python.PythonOptions;
-import org.apache.flink.python.env.ProcessPythonEnvironment;
import org.apache.flink.python.env.PythonEnvironment;
-import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.env.process.ProcessPythonEnvironment;
+import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.OpaqueMemoryResource;
@@ -109,8 +109,8 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner {
private final String taskName;
- /** The Python execution environment manager. */
- private final PythonEnvironmentManager environmentManager;
+ /** The Python process execution environment manager. */
+ private final ProcessPythonEnvironmentManager environmentManager;
/** The options used to configure the Python worker process. */
private final Map<String, String> jobOptions;
@@ -177,7 +177,7 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner {
public BeamPythonFunctionRunner(
String taskName,
- PythonEnvironmentManager environmentManager,
+ ProcessPythonEnvironmentManager environmentManager,
Map<String, String> jobOptions,
@Nullable FlinkMetricContainer flinkMetricContainer,
@Nullable KeyedStateBackend keyedStateBackend,
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractOneInputPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractOneInputPythonFunctionOperator.java
index 503bc09..92dbbd5 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractOneInputPythonFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractOneInputPythonFunctionOperator.java
@@ -22,12 +22,12 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator;
+import org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator;
/** Base class for all one input stream operators to execute Python functions. */
@Internal
public abstract class AbstractOneInputPythonFunctionOperator<IN, OUT>
- extends AbstractPythonFunctionOperator<OUT>
+ extends AbstractExternalPythonFunctionOperator<OUT>
implements OneInputStreamOperator<IN, OUT>, BoundedOneInput {
private static final long serialVersionUID = 1L;
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java
new file mode 100644
index 0000000..848b678
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.python.AbstractEmbeddedPythonFunctionOperator;
+import org.apache.flink.streaming.api.utils.ProtoUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.generated.Projection;
+import org.apache.flink.table.runtime.operators.python.utils.StreamRecordRowDataWrappingCollector;
+import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import pemja.core.PythonInterpreterConfig;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** The Python {@link ScalarFunction} operator in embedded Python environment. */
+@Internal
+public class EmbeddedPythonScalarFunctionOperator
+ extends AbstractEmbeddedPythonFunctionOperator<RowData>
+ implements OneInputStreamOperator<RowData, RowData>, BoundedOneInput {
+
+ private static final long serialVersionUID = 1L;
+
+ /** The Python {@link ScalarFunction}s to be executed. */
+ private final PythonFunctionInfo[] scalarFunctions;
+
+ /** The offsets of user-defined function inputs. */
+ private final int[] udfInputOffsets;
+
+ /** The input logical type. */
+ protected final RowType inputType;
+
+ /** The user-defined function input logical type. */
+ protected final RowType udfInputType;
+
+ /** The user-defined function output logical type. */
+ protected final RowType udfOutputType;
+
+ private GeneratedProjection forwardedFieldGeneratedProjection;
+
+ /** The GenericRowData reused holding the execution result of python udf. */
+ private GenericRowData reuseResultRowData;
+
+ /** The collector used to collect records. */
+ private transient StreamRecordRowDataWrappingCollector rowDataWrapper;
+
+ /** The Projection which projects the forwarded fields from the input row. */
+ private transient Projection<RowData, BinaryRowData> forwardedFieldProjection;
+
+ private transient PythonTypeUtils.DataConverter[] userDefinedFunctionInputConverters;
+ private transient Object[] userDefinedFunctionInputArgs;
+ private transient PythonTypeUtils.DataConverter[] userDefinedFunctionOutputConverters;
+
+ /** Whether there is only one input argument. */
+ private transient boolean isOneArg;
+
+ /** Whether is only one field of udf result. */
+ private transient boolean isOneFieldResult;
+
+ public EmbeddedPythonScalarFunctionOperator(
+ Configuration config,
+ PythonFunctionInfo[] scalarFunctions,
+ RowType inputType,
+ RowType udfInputType,
+ RowType udfOutputType,
+ int[] udfInputOffsets) {
+ super(config);
+ this.inputType = Preconditions.checkNotNull(inputType);
+ this.udfInputType = Preconditions.checkNotNull(udfInputType);
+ this.udfOutputType = Preconditions.checkNotNull(udfOutputType);
+ this.udfInputOffsets = Preconditions.checkNotNull(udfInputOffsets);
+ this.scalarFunctions = Preconditions.checkNotNull(scalarFunctions);
+ }
+
+ public EmbeddedPythonScalarFunctionOperator(
+ Configuration config,
+ PythonFunctionInfo[] scalarFunctions,
+ RowType inputType,
+ RowType udfInputType,
+ RowType udfOutputType,
+ int[] udfInputOffsets,
+ GeneratedProjection forwardedFieldGeneratedProjection) {
+ this(config, scalarFunctions, inputType, udfInputType, udfOutputType, udfInputOffsets);
+ this.forwardedFieldGeneratedProjection =
+ Preconditions.checkNotNull(forwardedFieldGeneratedProjection);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void open() throws Exception {
+ isOneArg = udfInputOffsets.length == 1;
+ isOneFieldResult = udfOutputType.getFieldCount() == 1;
+ super.open();
+ rowDataWrapper = new StreamRecordRowDataWrappingCollector(output);
+ reuseResultRowData = new GenericRowData(udfOutputType.getFieldCount());
+ RowType userDefinedFunctionInputType =
+ new RowType(
+ Arrays.stream(udfInputOffsets)
+ .mapToObj(i -> inputType.getFields().get(i))
+ .collect(Collectors.toList()));
+ userDefinedFunctionInputConverters =
+ userDefinedFunctionInputType.getFields().stream()
+ .map(RowType.RowField::getType)
+ .map(PythonTypeUtils::toDataConverter)
+ .toArray(PythonTypeUtils.DataConverter[]::new);
+ userDefinedFunctionInputArgs = new Object[udfInputOffsets.length];
+ userDefinedFunctionOutputConverters =
+ udfOutputType.getFields().stream()
+ .map(RowType.RowField::getType)
+ .map(PythonTypeUtils::toDataConverter)
+ .toArray(PythonTypeUtils.DataConverter[]::new);
+
+ if (forwardedFieldGeneratedProjection != null) {
+ forwardedFieldProjection =
+ forwardedFieldGeneratedProjection.newInstance(
+ Thread.currentThread().getContextClassLoader());
+ }
+ }
+
+ @Override
+ public void openPythonInterpreter(
+ String pythonExecutable,
+ Map<String, String> env,
+ PythonInterpreterConfig.ExecType execType)
+ throws Exception {
+ if (execType.equals(PythonInterpreterConfig.ExecType.SUB_INTERPRETER)) {
+ LOG.info("Create Operation in sub-interpreters.");
+ String[] commands =
+ new String[] {
+ pythonExecutable,
+ "-c",
+ String.format(
+ "from pyflink.fn_execution.utils.operation_utils import create_serialized_scalar_operation_from_proto;"
+ + "print(create_serialized_scalar_operation_from_proto(%s, %s, %s))",
+ Arrays.toString(getUserDefinedFunctionsProto().toByteArray()),
+ isOneArg ? "True" : "False",
+ isOneFieldResult ? "True" : "False")
+ };
+ interpreter.exec(
+ "from pyflink.fn_execution.utils.operation_utils import deserialized_operation_from_serialized_bytes");
+ interpreter.exec(
+ String.format(
+ "scalar_operation = deserialized_operation_from_serialized_bytes(%s)",
+ executeScript(commands, env)));
+ } else {
+ LOG.info("Create Operation in multi-threads.");
+
+ // The CPython extension included in proto does not support initialization
+ // multiple times, so we choose the only interpreter process to be responsible for
+ // initialization and proto parsing. The only interpreter parses the proto and
+ // serializes function operations with cloudpickle.
+ interpreter.exec(
+ "from pyflink.fn_execution.utils.operation_utils import create_scalar_operation_from_proto");
+ interpreter.set("proto", getUserDefinedFunctionsProto().toByteArray());
+
+ interpreter.exec(
+ String.format(
+ "scalar_operation = create_scalar_operation_from_proto(proto, %s, %s)",
+ isOneArg ? "True" : "False", isOneFieldResult ? "True" : "False"));
+ }
+
+ // invoke `open` method of ScalarOperation.
+ interpreter.invokeMethod("scalar_operation", "open");
+ }
+
+ @Override
+ public void endInput() {
+ if (interpreter != null) {
+ // invoke `close` method of ScalarOperation.
+ interpreter.invokeMethod("scalar_operation", "close");
+ }
+ }
+
+ @Override
+ public PythonEnv getPythonEnv() {
+ return scalarFunctions[0].getPythonFunction().getPythonEnv();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void processElement(StreamRecord<RowData> element) {
+ RowData value = element.getValue();
+
+ Object udfArgs = null;
+ if (userDefinedFunctionInputArgs.length > 1) {
+ for (int i = 0; i < userDefinedFunctionInputArgs.length; i++) {
+ userDefinedFunctionInputArgs[i] =
+ userDefinedFunctionInputConverters[i].toExternal(value, udfInputOffsets[i]);
+ }
+ udfArgs = userDefinedFunctionInputArgs;
+ } else if (userDefinedFunctionInputArgs.length == 1) {
+ udfArgs = userDefinedFunctionInputConverters[0].toExternal(value, udfInputOffsets[0]);
+ }
+
+ if (isOneFieldResult) {
+ Object udfResult =
+ interpreter.invokeMethod("scalar_operation", "process_element", udfArgs);
+ reuseResultRowData.setField(
+ 0, userDefinedFunctionOutputConverters[0].toInternal(udfResult));
+ } else {
+ Object[] udfResult =
+ (Object[])
+ interpreter.invokeMethod(
+ "scalar_operation", "process_element", udfArgs);
+ for (int i = 0; i < udfResult.length; i++) {
+ reuseResultRowData.setField(
+ i, userDefinedFunctionOutputConverters[i].toInternal(udfResult[i]));
+ }
+ }
+
+ if (forwardedFieldProjection != null) {
+ BinaryRowData forwardedRowData = forwardedFieldProjection.apply(value).copy();
+ JoinedRowData reuseJoinedRow = new JoinedRowData(forwardedRowData, reuseResultRowData);
+ rowDataWrapper.collect(reuseJoinedRow);
+ } else {
+ rowDataWrapper.collect(reuseResultRowData);
+ }
+ }
+
+ @Override
+ protected void invokeFinishBundle() throws Exception {
+ // TODO: Support batches invoking.
+ }
+
+ @Override
+ public FlinkFnApi.UserDefinedFunctions getUserDefinedFunctionsProto() {
+ FlinkFnApi.UserDefinedFunctions.Builder builder =
+ FlinkFnApi.UserDefinedFunctions.newBuilder();
+ // add udf proto
+ for (PythonFunctionInfo pythonFunctionInfo : scalarFunctions) {
+ builder.addUdfs(ProtoUtils.getUserDefinedFunctionProto(pythonFunctionInfo));
+ }
+ builder.setMetricEnabled(pythonConfig.isMetricEnabled());
+ builder.setProfileEnabled(pythonConfig.isProfileEnabled());
+ return builder.build();
+ }
+
+ private String executeScript(final String[] commands, Map<String, String> env)
+ throws IOException {
+ ProcessBuilder pb = new ProcessBuilder(commands);
+ pb.environment().putAll(env);
+ pb.redirectErrorStream(true);
+ Process p = pb.start();
+ InputStream in = new BufferedInputStream(p.getInputStream());
+ StringBuilder out = new StringBuilder();
+ String s;
+ try (BufferedReader br = new BufferedReader(new InputStreamReader(in))) {
+ while ((s = br.readLine()) != null) {
+ out.append(s).append("\n");
+ }
+ }
+ try {
+ if (p.waitFor() != 0) {
+ throw new IOException(
+ String.format(
+ "Failed to execute the command: %s\noutput: %s",
+ String.join(" ", commands), out));
+ }
+ } catch (InterruptedException e) {
+ // Ignored. The subprocess is dead after "br.readLine()" returns null, so the call of
+ // "waitFor" should return intermediately.
+ }
+ return out.toString();
+ }
+}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java
index c0d86d6..d502a47 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.runtime.runners.python.beam;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
-import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.state.KeyedStateBackend;
@@ -54,7 +54,7 @@ public class BeamTablePythonFunctionRunner extends BeamPythonFunctionRunner {
public BeamTablePythonFunctionRunner(
String taskName,
- PythonEnvironmentManager environmentManager,
+ ProcessPythonEnvironmentManager environmentManager,
String functionUrn,
GeneratedMessageV3 userDefinedFunctionProto,
Map<String, String> jobOptions,
@@ -119,7 +119,7 @@ public class BeamTablePythonFunctionRunner extends BeamPythonFunctionRunner {
public static BeamTablePythonFunctionRunner stateless(
String taskName,
- PythonEnvironmentManager environmentManager,
+ ProcessPythonEnvironmentManager environmentManager,
String functionUrn,
GeneratedMessageV3 userDefinedFunctionProto,
Map<String, String> jobOptions,
@@ -146,7 +146,7 @@ public class BeamTablePythonFunctionRunner extends BeamPythonFunctionRunner {
public static BeamTablePythonFunctionRunner stateful(
String taskName,
- PythonEnvironmentManager environmentManager,
+ ProcessPythonEnvironmentManager environmentManager,
String functionUrn,
GeneratedMessageV3 userDefinedFunctionProto,
Map<String, String> jobOptions,
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java
index b04d12b..8d7a832 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java
@@ -30,6 +30,8 @@ import org.apache.flink.api.common.typeutils.base.ShortSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.runtime.typeutils.serializers.python.ArrayDataSerializer;
import org.apache.flink.table.runtime.typeutils.serializers.python.DecimalDataSerializer;
import org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer;
@@ -56,9 +58,12 @@ import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
+import org.apache.flink.table.types.utils.TypeConversions;
+import java.io.Serializable;
import java.math.BigDecimal;
import java.math.RoundingMode;
+import java.sql.Time;
import java.util.TimeZone;
/**
@@ -84,6 +89,10 @@ public final class PythonTypeUtils {
return logicalType.accept(new LogicalTypetoInternalSerializerConverter());
}
+ public static DataConverter toDataConverter(LogicalType logicalType) {
+ return logicalType.accept(new LogicalTypeToDataConverter());
+ }
+
/**
* Convert the specified bigDecimal according to the specified precision and scale. The
* specified bigDecimal may be rounded to have the specified scale and then the specified
@@ -458,4 +467,267 @@ public final class PythonTypeUtils {
logicalType.asSummaryString()));
}
}
+
+ /** Data Converter that converts the data to the java format data which can be used in PemJa. */
+ public abstract static class DataConverter<IN, INTER, OUT> implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final DataFormatConverters.DataFormatConverter<IN, INTER> dataFormatConverter;
+
+ public DataConverter(
+ DataFormatConverters.DataFormatConverter<IN, INTER> dataFormatConverter) {
+ this.dataFormatConverter = dataFormatConverter;
+ }
+
+ public final IN toInternal(OUT value) {
+ return dataFormatConverter.toInternal(toInternalImpl(value));
+ }
+
+ public final OUT toExternal(RowData row, int column) {
+ return toExternalImpl(dataFormatConverter.toExternal(row, column));
+ }
+
+ abstract INTER toInternalImpl(OUT value);
+
+ abstract OUT toExternalImpl(INTER value);
+ }
+
+ /** Identity data converter. */
+ public static final class IdentityDataConverter<IN, OUT> extends DataConverter<IN, OUT, OUT> {
+ IdentityDataConverter(
+ DataFormatConverters.DataFormatConverter<IN, OUT> dataFormatConverter) {
+ super(dataFormatConverter);
+ }
+
+ @Override
+ OUT toInternalImpl(OUT value) {
+ return value;
+ }
+
+ @Override
+ OUT toExternalImpl(OUT value) {
+ return value;
+ }
+ }
+
+ /**
+ * Python Long will be converted to Long in PemJa, so we need ByteDataConverter to convert Java
+ * Long to internal Byte.
+ */
+ public static final class ByteDataConverter extends DataConverter<Byte, Byte, Long> {
+
+ public static final ByteDataConverter INSTANCE = new ByteDataConverter();
+
+ private ByteDataConverter() {
+ super(DataFormatConverters.ByteConverter.INSTANCE);
+ }
+
+ @Override
+ Byte toInternalImpl(Long value) {
+ return value.byteValue();
+ }
+
+ @Override
+ Long toExternalImpl(Byte value) {
+ return value.longValue();
+ }
+ }
+
+ /**
+ * Python Long will be converted to Long in PemJa, so we need ShortDataConverter to convert Java
+ * Long to internal Short.
+ */
+ public static final class ShortDataConverter extends DataConverter<Short, Short, Long> {
+
+ public static final ShortDataConverter INSTANCE = new ShortDataConverter();
+
+ private ShortDataConverter() {
+ super(DataFormatConverters.ShortConverter.INSTANCE);
+ }
+
+ @Override
+ Short toInternalImpl(Long value) {
+ return value.shortValue();
+ }
+
+ @Override
+ Long toExternalImpl(Short value) {
+ return value.longValue();
+ }
+ }
+
+ /**
+ * Python Long will be converted to Long in PemJa, so we need IntDataConverter to convert Java
+ * Long to internal Integer.
+ */
+ public static final class IntDataConverter extends DataConverter<Integer, Integer, Long> {
+
+ public static final IntDataConverter INSTANCE = new IntDataConverter();
+
+ private IntDataConverter() {
+ super(DataFormatConverters.IntConverter.INSTANCE);
+ }
+
+ @Override
+ Integer toInternalImpl(Long value) {
+ return value.intValue();
+ }
+
+ @Override
+ Long toExternalImpl(Integer value) {
+ return value.longValue();
+ }
+ }
+
+ /**
+ * Python Float will be converted to Double in PemJa, so we need FloatDataConverter to convert
+ * Java Double to internal Float.
+ */
+ public static final class FloatDataConverter extends DataConverter<Float, Float, Double> {
+
+ public static final FloatDataConverter INSTANCE = new FloatDataConverter();
+
+ private FloatDataConverter() {
+ super(DataFormatConverters.FloatConverter.INSTANCE);
+ }
+
+ @Override
+ Float toInternalImpl(Double value) {
+ return value.floatValue();
+ }
+
+ @Override
+ Double toExternalImpl(Float value) {
+ return value.doubleValue();
+ }
+ }
+
+ /**
+ * Python datetime.time will be converted to Time in PemJa, so we need TimeDataConverter to
+ * convert Java Double to internal Integer.
+ */
+ public static final class TimeDataConverter extends DataConverter<Integer, Integer, Time> {
+
+ public static final TimeDataConverter INSTANCE = new TimeDataConverter();
+
+ private TimeDataConverter() {
+ super(DataFormatConverters.IntConverter.INSTANCE);
+ }
+
+ @Override
+ Integer toInternalImpl(Time value) {
+ return (int) value.getTime();
+ }
+
+ @Override
+ Time toExternalImpl(Integer value) {
+ return new Time(value);
+ }
+ }
+
+ private static final class LogicalTypeToDataConverter
+ extends LogicalTypeDefaultVisitor<DataConverter> {
+
+ @Override
+ public DataConverter visit(BooleanType booleanType) {
+ return defaultConverter(booleanType);
+ }
+
+ @Override
+ public DataConverter visit(TinyIntType tinyIntType) {
+ return ByteDataConverter.INSTANCE;
+ }
+
+ @Override
+ public DataConverter visit(SmallIntType smallIntType) {
+ return ShortDataConverter.INSTANCE;
+ }
+
+ @Override
+ public DataConverter visit(IntType intType) {
+ return IntDataConverter.INSTANCE;
+ }
+
+ @Override
+ public DataConverter visit(BigIntType bigIntType) {
+ return defaultConverter(bigIntType);
+ }
+
+ @Override
+ public DataConverter visit(FloatType floatType) {
+ return FloatDataConverter.INSTANCE;
+ }
+
+ @Override
+ public DataConverter visit(DoubleType doubleType) {
+ return defaultConverter(doubleType);
+ }
+
+ @Override
+ public DataConverter visit(DecimalType decimalType) {
+ return defaultConverter(decimalType);
+ }
+
+ @Override
+ public DataConverter visit(VarCharType varCharType) {
+ return defaultConverter(varCharType);
+ }
+
+ @Override
+ public DataConverter visit(CharType charType) {
+ return defaultConverter(charType);
+ }
+
+ @Override
+ public DataConverter visit(VarBinaryType varBinaryType) {
+ return defaultConverter(varBinaryType);
+ }
+
+ @Override
+ public DataConverter visit(BinaryType binaryType) {
+ return defaultConverter(binaryType);
+ }
+
+ @Override
+ public DataConverter visit(DateType dateType) {
+ return new IdentityDataConverter<>(DataFormatConverters.DateConverter.INSTANCE);
+ }
+
+ @Override
+ public DataConverter visit(TimeType timeType) {
+ return TimeDataConverter.INSTANCE;
+ }
+
+ @Override
+ public DataConverter visit(TimestampType timestampType) {
+ return new IdentityDataConverter<>(
+ new DataFormatConverters.TimestampConverter(timestampType.getPrecision()));
+ }
+
+ @Override
+ public DataConverter visit(ArrayType arrayType) {
+ return defaultConverter(arrayType);
+ }
+
+ @Override
+ public DataConverter visit(MapType mapType) {
+ return defaultConverter(mapType);
+ }
+
+ @Override
+ protected DataConverter defaultMethod(LogicalType logicalType) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Currently, Python UDF doesn't support logical type %s in Thread Mode.",
+ logicalType.asSummaryString()));
+ }
+
+ @SuppressWarnings("unchecked")
+ private DataConverter defaultConverter(LogicalType logicalType) {
+ return new IdentityDataConverter<>(
+ DataFormatConverters.getConverterForDataType(
+ TypeConversions.fromLogicalToDataType(logicalType)));
+ }
+ }
}
diff --git a/flink-python/src/main/resources/META-INF/NOTICE b/flink-python/src/main/resources/META-INF/NOTICE
index 21d0508..fa97855 100644
--- a/flink-python/src/main/resources/META-INF/NOTICE
+++ b/flink-python/src/main/resources/META-INF/NOTICE
@@ -27,6 +27,7 @@ This project bundles the following dependencies under the Apache Software Licens
- org.apache.beam:beam-vendor-sdks-java-extensions-protobuf:2.27.0
- org.apache.beam:beam-vendor-guava-26_0-jre:0.1
- org.apache.beam:beam-vendor-grpc-1_26_0:0.3
+- com.alibaba:pemja:0.1.2
This project bundles the following dependencies under the BSD license.
See bundled license files for details
diff --git a/flink-python/src/test/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManagerTest.java b/flink-python/src/test/java/org/apache/flink/python/env/process/ProcessPythonEnvironmentManagerTest.java
similarity index 95%
rename from flink-python/src/test/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManagerTest.java
rename to flink-python/src/test/java/org/apache/flink/python/env/process/ProcessPythonEnvironmentManagerTest.java
index b1a16be..aca4748 100644
--- a/flink-python/src/test/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManagerTest.java
+++ b/flink-python/src/test/java/org/apache/flink/python/env/process/ProcessPythonEnvironmentManagerTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.python.env.beam;
+package org.apache.flink.python.env.process;
import org.apache.flink.api.common.JobID;
import org.apache.flink.python.env.PythonDependencyInfo;
@@ -44,14 +44,14 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import static org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.PYFLINK_GATEWAY_DISABLED;
-import static org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.PYTHON_ARCHIVES_DIR;
-import static org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.PYTHON_FILES_DIR;
-import static org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.PYTHON_REQUIREMENTS_CACHE;
-import static org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.PYTHON_REQUIREMENTS_DIR;
-import static org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.PYTHON_REQUIREMENTS_FILE;
-import static org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.PYTHON_REQUIREMENTS_INSTALL_DIR;
-import static org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.PYTHON_WORKING_DIR;
+import static org.apache.flink.python.env.process.ProcessPythonEnvironmentManager.PYFLINK_GATEWAY_DISABLED;
+import static org.apache.flink.python.env.process.ProcessPythonEnvironmentManager.PYTHON_ARCHIVES_DIR;
+import static org.apache.flink.python.env.process.ProcessPythonEnvironmentManager.PYTHON_FILES_DIR;
+import static org.apache.flink.python.env.process.ProcessPythonEnvironmentManager.PYTHON_REQUIREMENTS_CACHE;
+import static org.apache.flink.python.env.process.ProcessPythonEnvironmentManager.PYTHON_REQUIREMENTS_DIR;
+import static org.apache.flink.python.env.process.ProcessPythonEnvironmentManager.PYTHON_REQUIREMENTS_FILE;
+import static org.apache.flink.python.env.process.ProcessPythonEnvironmentManager.PYTHON_REQUIREMENTS_INSTALL_DIR;
+import static org.apache.flink.python.env.process.ProcessPythonEnvironmentManager.PYTHON_WORKING_DIR;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator.java
index cde0ef0..a5bf90a 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator.java
@@ -196,7 +196,7 @@ public class PassThroughPythonStreamGroupWindowAggregateOperator<K>
public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
return new PassThroughStreamGroupWindowAggregatePythonFunctionRunner(
getRuntimeContext().getTaskName(),
- PythonTestUtils.createTestEnvironmentManager(),
+ PythonTestUtils.createTestProcessEnvironmentManager(),
userDefinedFunctionInputType,
userDefinedFunctionOutputType,
STREAM_GROUP_WINDOW_AGGREGATE_URN,
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupAggregateOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupAggregateOperatorTest.java
index 54a6adf..d5472fc 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupAggregateOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupAggregateOperatorTest.java
@@ -257,7 +257,7 @@ public class PythonStreamGroupAggregateOperatorTest
public PythonFunctionRunner createPythonFunctionRunner() {
return new PassThroughStreamAggregatePythonFunctionRunner(
getRuntimeContext().getTaskName(),
- PythonTestUtils.createTestEnvironmentManager(),
+ PythonTestUtils.createTestProcessEnvironmentManager(),
userDefinedFunctionInputType,
outputType,
STREAM_GROUP_AGGREGATE_URN,
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupTableAggregateOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupTableAggregateOperatorTest.java
index f325b8a..bdef22d 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupTableAggregateOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupTableAggregateOperatorTest.java
@@ -270,7 +270,7 @@ public class PythonStreamGroupTableAggregateOperatorTest
public PythonFunctionRunner createPythonFunctionRunner() {
return new PassThroughStreamTableAggregatePythonFunctionRunner(
getRuntimeContext().getTaskName(),
- PythonTestUtils.createTestEnvironmentManager(),
+ PythonTestUtils.createTestProcessEnvironmentManager(),
userDefinedFunctionInputType,
outputType,
STREAM_GROUP_TABLE_AGGREGATE_URN,
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java
index 254cb86..e758119 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java
@@ -233,7 +233,7 @@ public class BatchArrowPythonGroupAggregateFunctionOperatorTest
public PythonFunctionRunner createPythonFunctionRunner() {
return new PassThroughPythonAggregateFunctionRunner(
getRuntimeContext().getTaskName(),
- PythonTestUtils.createTestEnvironmentManager(),
+ PythonTestUtils.createTestProcessEnvironmentManager(),
udfInputType,
udfOutputType,
getFunctionUrn(),
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java
index d631bf5..a6ec6db 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java
@@ -402,7 +402,7 @@ public class BatchArrowPythonGroupWindowAggregateFunctionOperatorTest
public PythonFunctionRunner createPythonFunctionRunner() {
return new PassThroughPythonAggregateFunctionRunner(
getRuntimeContext().getTaskName(),
- PythonTestUtils.createTestEnvironmentManager(),
+ PythonTestUtils.createTestProcessEnvironmentManager(),
udfInputType,
udfOutputType,
getFunctionUrn(),
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java
index c272ed4..1fee476 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java
@@ -303,7 +303,7 @@ public class BatchArrowPythonOverWindowAggregateFunctionOperatorTest
public PythonFunctionRunner createPythonFunctionRunner() {
return new PassThroughPythonAggregateFunctionRunner(
getRuntimeContext().getTaskName(),
- PythonTestUtils.createTestEnvironmentManager(),
+ PythonTestUtils.createTestProcessEnvironmentManager(),
udfInputType,
udfOutputType,
getFunctionUrn(),
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
index dae1313..e762222 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
@@ -517,7 +517,7 @@ public class StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
public PythonFunctionRunner createPythonFunctionRunner() {
return new PassThroughPythonAggregateFunctionRunner(
getRuntimeContext().getTaskName(),
- PythonTestUtils.createTestEnvironmentManager(),
+ PythonTestUtils.createTestProcessEnvironmentManager(),
udfInputType,
udfOutputType,
getFunctionUrn(),
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java
index 9c410d9..a95157c 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java
@@ -163,7 +163,7 @@ public class StreamArrowPythonProcTimeBoundedRangeOperatorTest
public PythonFunctionRunner createPythonFunctionRunner() {
return new PassThroughPythonAggregateFunctionRunner(
getRuntimeContext().getTaskName(),
- PythonTestUtils.createTestEnvironmentManager(),
+ PythonTestUtils.createTestProcessEnvironmentManager(),
udfInputType,
udfOutputType,
getFunctionUrn(),
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRowsOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRowsOperatorTest.java
index 3664a1e..b65a0fb 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRowsOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRowsOperatorTest.java
@@ -164,7 +164,7 @@ public class StreamArrowPythonProcTimeBoundedRowsOperatorTest
public PythonFunctionRunner createPythonFunctionRunner() {
return new PassThroughPythonAggregateFunctionRunner(
getRuntimeContext().getTaskName(),
- PythonTestUtils.createTestEnvironmentManager(),
+ PythonTestUtils.createTestProcessEnvironmentManager(),
udfInputType,
udfOutputType,
getFunctionUrn(),
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
index b7cf02b..849c6b0 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
@@ -319,7 +319,7 @@ public class StreamArrowPythonRowTimeBoundedRangeOperatorTest
public PythonFunctionRunner createPythonFunctionRunner() {
return new PassThroughPythonAggregateFunctionRunner(
getRuntimeContext().getTaskName(),
- PythonTestUtils.createTestEnvironmentManager(),
+ PythonTestUtils.createTestProcessEnvironmentManager(),
udfInputType,
udfOutputType,
getFunctionUrn(),
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
index acc6c82..1ee0e7f 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
@@ -289,7 +289,7 @@ public class StreamArrowPythonRowTimeBoundedRowsOperatorTest
public PythonFunctionRunner createPythonFunctionRunner() {
return new PassThroughPythonAggregateFunctionRunner(
getRuntimeContext().getTaskName(),
- PythonTestUtils.createTestEnvironmentManager(),
+ PythonTestUtils.createTestProcessEnvironmentManager(),
udfInputType,
udfOutputType,
getFunctionUrn(),
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java
index 96c7d88..17ae33c 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java
@@ -147,7 +147,7 @@ public class PythonScalarFunctionOperatorTest
public PythonFunctionRunner createPythonFunctionRunner() throws IOException {
return new PassThroughPythonScalarFunctionRunner(
getRuntimeContext().getTaskName(),
- PythonTestUtils.createTestEnvironmentManager(),
+ PythonTestUtils.createTestProcessEnvironmentManager(),
udfInputType,
udfOutputType,
getFunctionUrn(),
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java
index 5e52d80..953afb0 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java
@@ -146,7 +146,7 @@ public class ArrowPythonScalarFunctionOperatorTest
public PythonFunctionRunner createPythonFunctionRunner() throws IOException {
return new PassThroughPythonScalarFunctionRunner(
getRuntimeContext().getTaskName(),
- PythonTestUtils.createTestEnvironmentManager(),
+ PythonTestUtils.createTestProcessEnvironmentManager(),
udfInputType,
udfOutputType,
getFunctionUrn(),
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
index 75ad3e7..840503c 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
@@ -128,7 +128,7 @@ public class PythonTableFunctionOperatorTest
public PythonFunctionRunner createPythonFunctionRunner() {
return new PassThroughPythonTableFunctionRunner(
getRuntimeContext().getTaskName(),
- PythonTestUtils.createTestEnvironmentManager(),
+ PythonTestUtils.createTestProcessEnvironmentManager(),
udfInputType,
udfOutputType,
getFunctionUrn(),
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtilsTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtilsTest.java
index e79d4d4..4804a9d 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtilsTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtilsTest.java
@@ -21,7 +21,9 @@ package org.apache.flink.table.runtime.typeutils;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.UnresolvedUserDefinedType;
@@ -64,6 +66,17 @@ public class PythonTypeUtilsTest {
}
@Test
+ public void testLogicalTypeToDataConverter() {
+ PythonTypeUtils.DataConverter converter = PythonTypeUtils.toDataConverter(new IntType());
+
+ GenericRowData data = new GenericRowData(1);
+ data.setField(0, 10);
+ Object externalData = converter.toExternal(data, 0);
+ assertTrue(externalData instanceof Long);
+ assertEquals(externalData, 10L);
+ }
+
+ @Test
public void testUnsupportedTypeSerializer() {
LogicalType logicalType =
new UnresolvedUserDefinedType(UnresolvedIdentifier.of("cat", "db", "MyType"));
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java
index 0752bbd..fb16068 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java
@@ -24,7 +24,7 @@ import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.PythonConfig;
-import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer;
@@ -67,7 +67,7 @@ public class PassThroughPythonAggregateFunctionRunner extends BeamTablePythonFun
public PassThroughPythonAggregateFunctionRunner(
String taskName,
- PythonEnvironmentManager environmentManager,
+ ProcessPythonEnvironmentManager environmentManager,
RowType inputType,
RowType outputType,
String functionUrn,
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
index 7638e56..4815507 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
@@ -19,7 +19,7 @@
package org.apache.flink.table.runtime.utils;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
-import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
import org.apache.flink.table.types.logical.RowType;
@@ -43,7 +43,7 @@ public class PassThroughPythonScalarFunctionRunner extends BeamTablePythonFuncti
public PassThroughPythonScalarFunctionRunner(
String taskName,
- PythonEnvironmentManager environmentManager,
+ ProcessPythonEnvironmentManager environmentManager,
RowType inputType,
RowType outputType,
String functionUrn,
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
index e4cd02d..9469422 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
@@ -19,7 +19,7 @@
package org.apache.flink.table.runtime.utils;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
-import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
import org.apache.flink.table.types.logical.RowType;
@@ -45,7 +45,7 @@ public class PassThroughPythonTableFunctionRunner extends BeamTablePythonFunctio
public PassThroughPythonTableFunctionRunner(
String taskName,
- PythonEnvironmentManager environmentManager,
+ ProcessPythonEnvironmentManager environmentManager,
RowType inputType,
RowType outputType,
String functionUrn,
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java
index 14e147f..e2886005 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.runtime.utils;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
-import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
@@ -49,7 +49,7 @@ public class PassThroughStreamAggregatePythonFunctionRunner extends BeamTablePyt
public PassThroughStreamAggregatePythonFunctionRunner(
String taskName,
- PythonEnvironmentManager environmentManager,
+ ProcessPythonEnvironmentManager environmentManager,
RowType inputType,
RowType outputType,
String functionUrn,
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java
index 12e0e2f..7f977f2 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.runtime.utils;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
-import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.table.runtime.operators.python.aggregate.PassThroughPythonStreamGroupWindowAggregateOperator;
@@ -45,7 +45,7 @@ public class PassThroughStreamGroupWindowAggregatePythonFunctionRunner
public PassThroughStreamGroupWindowAggregatePythonFunctionRunner(
String taskName,
- PythonEnvironmentManager environmentManager,
+ ProcessPythonEnvironmentManager environmentManager,
RowType inputType,
RowType outputType,
String functionUrn,
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java
index b84b63a..f9ed8ac 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.runtime.utils;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
-import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
@@ -51,7 +51,7 @@ public class PassThroughStreamTableAggregatePythonFunctionRunner
public PassThroughStreamTableAggregatePythonFunctionRunner(
String taskName,
- PythonEnvironmentManager environmentManager,
+ ProcessPythonEnvironmentManager environmentManager,
RowType inputType,
RowType outputType,
String functionUrn,
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PythonTestUtils.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PythonTestUtils.java
index 82ac09b..932d9fa 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PythonTestUtils.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PythonTestUtils.java
@@ -20,8 +20,7 @@ package org.apache.flink.table.runtime.utils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.python.env.PythonDependencyInfo;
-import org.apache.flink.python.env.PythonEnvironmentManager;
-import org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager;
+import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.python.util.PythonEnvironmentManagerUtils;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
@@ -86,7 +85,7 @@ public final class PythonTestUtils {
"root"));
}
- public static PythonEnvironmentManager createTestEnvironmentManager() {
+ public static ProcessPythonEnvironmentManager createTestProcessEnvironmentManager() {
Map<String, String> env = new HashMap<>();
env.put(PythonEnvironmentManagerUtils.PYFLINK_UDF_RUNNER_DIR, "");
return new ProcessPythonEnvironmentManager(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java
index a877824..db17df4 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java
@@ -74,6 +74,10 @@ public abstract class CommonExecPythonCalc extends ExecNodeBase<RowData>
"org.apache.flink.table.runtime.operators.python.scalar."
+ "PythonScalarFunctionOperator";
+ private static final String EMBEDDED_PYTHON_SCALAR_FUNCTION_OPERATOR_NAME =
+ "org.apache.flink.table.runtime.operators.python.scalar."
+ + "EmbeddedPythonScalarFunctionOperator";
+
private static final String ARROW_PYTHON_SCALAR_FUNCTION_OPERATOR_NAME =
"org.apache.flink.table.runtime.operators.python.scalar.arrow."
+ "ArrowPythonScalarFunctionOperator";
@@ -207,10 +211,15 @@ public abstract class CommonExecPythonCalc extends ExecNodeBase<RowData>
int[] forwardedFields,
boolean isArrow) {
Class<?> clazz;
+ boolean isInProcessMode = CommonPythonUtil.isPythonWorkerInProcessMode(mergedConfig);
if (isArrow) {
clazz = CommonPythonUtil.loadClass(ARROW_PYTHON_SCALAR_FUNCTION_OPERATOR_NAME);
} else {
- clazz = CommonPythonUtil.loadClass(PYTHON_SCALAR_FUNCTION_OPERATOR_NAME);
+ if (isInProcessMode) {
+ clazz = CommonPythonUtil.loadClass(PYTHON_SCALAR_FUNCTION_OPERATOR_NAME);
+ } else {
+ clazz = CommonPythonUtil.loadClass(EMBEDDED_PYTHON_SCALAR_FUNCTION_OPERATOR_NAME);
+ }
}
final RowType inputType = inputRowTypeInfo.toRowType();
@@ -224,34 +233,79 @@ public abstract class CommonExecPythonCalc extends ExecNodeBase<RowData>
.project(outputType);
try {
- Constructor<?> ctor =
- clazz.getConstructor(
- Configuration.class,
- PythonFunctionInfo[].class,
- RowType.class,
- RowType.class,
- RowType.class,
- GeneratedProjection.class,
- GeneratedProjection.class);
- return (OneInputStreamOperator<RowData, RowData>)
- ctor.newInstance(
- mergedConfig,
- pythonFunctionInfos,
- inputType,
- udfInputType,
- udfOutputType,
- ProjectionCodeGenerator.generateProjection(
- CodeGeneratorContext.apply(tableConfig),
- "UdfInputProjection",
+ if (isInProcessMode) {
+ Constructor<?> ctor =
+ clazz.getConstructor(
+ Configuration.class,
+ PythonFunctionInfo[].class,
+ RowType.class,
+ RowType.class,
+ RowType.class,
+ GeneratedProjection.class,
+ GeneratedProjection.class);
+ return (OneInputStreamOperator<RowData, RowData>)
+ ctor.newInstance(
+ mergedConfig,
+ pythonFunctionInfos,
+ inputType,
+ udfInputType,
+ udfOutputType,
+ ProjectionCodeGenerator.generateProjection(
+ CodeGeneratorContext.apply(tableConfig),
+ "UdfInputProjection",
+ inputType,
+ udfInputType,
+ udfInputOffsets),
+ ProjectionCodeGenerator.generateProjection(
+ CodeGeneratorContext.apply(tableConfig),
+ "ForwardedFieldProjection",
+ inputType,
+ forwardedFieldType,
+ forwardedFields));
+ } else {
+ if (forwardedFields.length > 0) {
+ Constructor<?> ctor =
+ clazz.getConstructor(
+ Configuration.class,
+ PythonFunctionInfo[].class,
+ RowType.class,
+ RowType.class,
+ RowType.class,
+ int[].class,
+ GeneratedProjection.class);
+ return (OneInputStreamOperator<RowData, RowData>)
+ ctor.newInstance(
+ mergedConfig,
+ pythonFunctionInfos,
inputType,
udfInputType,
- udfInputOffsets),
- ProjectionCodeGenerator.generateProjection(
- CodeGeneratorContext.apply(tableConfig),
- "ForwardedFieldProjection",
+ udfOutputType,
+ udfInputOffsets,
+ ProjectionCodeGenerator.generateProjection(
+ CodeGeneratorContext.apply(tableConfig),
+ "ForwardedFieldProjection",
+ inputType,
+ forwardedFieldType,
+ forwardedFields));
+ } else {
+ Constructor<?> ctor =
+ clazz.getConstructor(
+ Configuration.class,
+ PythonFunctionInfo[].class,
+ RowType.class,
+ RowType.class,
+ RowType.class,
+ int[].class);
+ return (OneInputStreamOperator<RowData, RowData>)
+ ctor.newInstance(
+ mergedConfig,
+ pythonFunctionInfos,
inputType,
- forwardedFieldType,
- forwardedFields));
+ udfInputType,
+ udfOutputType,
+ udfInputOffsets);
+ }
+ }
} catch (Exception e) {
throw new TableException("Python Scalar Function Operator constructed failed.", e);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java
index a7c3024..87b9b33 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java
@@ -158,6 +158,20 @@ public class CommonPythonUtil {
}
}
+ @SuppressWarnings("unchecked")
+ public static boolean isPythonWorkerInProcessMode(Configuration config) {
+ Class clazz = loadClass("org.apache.flink.python.PythonOptions");
+ try {
+ return config.getString(
+ (ConfigOption<String>)
+ (clazz.getField("PYTHON_EXECUTION_MODE").get(null)))
+ .equalsIgnoreCase("process");
+
+ } catch (IllegalAccessException | NoSuchFieldException e) {
+ throw new TableException("Field PYTHON_EXECUTION_MODE accessed failed.", e);
+ }
+ }
+
public static Tuple2<PythonAggregateFunctionInfo[], DataViewSpec[][]>
extractPythonAggregateFunctionInfos(
AggregateInfoList pythonAggregateInfoList, AggregateCall[] aggCalls) {