You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by hx...@apache.org on 2022/01/27 02:16:47 UTC

[flink] branch master updated: [FLINK-25719][python] Support General Python UDF in Thread Mode

This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 51eb386  [FLINK-25719][python] Support General Python UDF in Thread Mode
51eb386 is described below

commit 51eb386048484502f88d8806b75c17b04b91a613
Author: huangxingbo <hx...@gmail.com>
AuthorDate: Thu Jan 20 17:47:20 2022 +0800

    [FLINK-25719][python] Support General Python UDF in Thread Mode
    
    This closes #18418.
---
 .../shortcodes/generated/python_configuration.html |   6 +
 flink-python/dev/dev-requirements.txt              |   1 +
 flink-python/pom.xml                               |   8 +
 .../pyflink/fn_execution/beam/beam_operations.py   |  17 +-
 .../fn_execution/beam/beam_operations_fast.pyx     |   4 +-
 .../fn_execution/beam/beam_operations_slow.py      |   4 +-
 .../pyflink/fn_execution/coder_impl_fast.pyx       |   5 +-
 .../pyflink/fn_execution/coder_impl_slow.py        |   5 +-
 flink-python/pyflink/fn_execution/coders.py        |  90 +++---
 .../pyflink/fn_execution/datastream/operations.py  |  33 ++-
 .../fn_execution/datastream/runtime_context.py     |   5 +-
 .../fn_execution/datastream/timerservice_impl.py   |  16 +-
 .../datastream/window/window_operator.py           |   3 +-
 .../pyflink/fn_execution/table/aggregate_fast.pyx  |   7 +-
 .../pyflink/fn_execution/table/aggregate_slow.py   |   7 +-
 .../pyflink/fn_execution/table/operations.py       |  84 +++---
 .../pyflink/fn_execution/table/state_data_view.py  |   7 +-
 .../fn_execution/table/window_aggregate_fast.pyx   |   5 +-
 .../fn_execution/table/window_aggregate_slow.py    |   5 +-
 .../pyflink/fn_execution/table/window_context.py   |   9 +-
 .../fn_execution/table/window_process_function.py  |   4 +-
 .../fn_execution/tests/test_process_mode_boot.py   |   2 +-
 .../pyflink/fn_execution/utils/operation_utils.py  |  51 +++-
 .../pyflink/table/tests/test_dependency.py         |  70 +++--
 flink-python/pyflink/table/tests/test_udf.py       |  51 +++-
 flink-python/setup.py                              |   1 +
 .../java/org/apache/flink/python/PythonConfig.java |   8 +
 .../org/apache/flink/python/PythonOptions.java     |  13 +
 ....java => AbstractPythonEnvironmentManager.java} | 317 +++++++++------------
 .../flink/python/env/PythonDependencyInfo.java     |  32 ++-
 .../flink/python/env/PythonEnvironmentManager.java |  20 +-
 .../EmbeddedPythonEnvironment.java}                |  23 +-
 .../embedded/EmbeddedPythonEnvironmentManager.java |  84 ++++++
 .../{ => process}/ProcessPythonEnvironment.java    |   3 +-
 .../process/ProcessPythonEnvironmentManager.java   | 100 +++++++
 .../python/util/PythonEnvironmentManagerUtils.java |   2 +-
 .../AbstractDataStreamPythonFunctionOperator.java  |   2 +-
 .../AbstractEmbeddedPythonFunctionOperator.java    | 166 +++++++++++
 .../AbstractExternalPythonFunctionOperator.java    | 154 ++++++++++
 .../python/AbstractPythonFunctionOperator.java     | 110 +------
 .../beam/BeamDataStreamPythonFunctionRunner.java   |   4 +-
 .../python/beam/BeamPythonFunctionRunner.java      |  10 +-
 .../AbstractOneInputPythonFunctionOperator.java    |   4 +-
 .../EmbeddedPythonScalarFunctionOperator.java      | 303 ++++++++++++++++++++
 .../python/beam/BeamTablePythonFunctionRunner.java |   8 +-
 .../table/runtime/typeutils/PythonTypeUtils.java   | 272 ++++++++++++++++++
 flink-python/src/main/resources/META-INF/NOTICE    |   1 +
 .../ProcessPythonEnvironmentManagerTest.java       |  18 +-
 ...ghPythonStreamGroupWindowAggregateOperator.java |   2 +-
 .../PythonStreamGroupAggregateOperatorTest.java    |   2 +-
 ...ythonStreamGroupTableAggregateOperatorTest.java |   2 +-
 ...owPythonGroupAggregateFunctionOperatorTest.java |   2 +-
 ...onGroupWindowAggregateFunctionOperatorTest.java |   2 +-
 ...honOverWindowAggregateFunctionOperatorTest.java |   2 +-
 ...onGroupWindowAggregateFunctionOperatorTest.java |   2 +-
 ...rrowPythonProcTimeBoundedRangeOperatorTest.java |   2 +-
 ...ArrowPythonProcTimeBoundedRowsOperatorTest.java |   2 +-
 ...ArrowPythonRowTimeBoundedRangeOperatorTest.java |   2 +-
 ...mArrowPythonRowTimeBoundedRowsOperatorTest.java |   2 +-
 .../scalar/PythonScalarFunctionOperatorTest.java   |   2 +-
 .../ArrowPythonScalarFunctionOperatorTest.java     |   2 +-
 .../table/PythonTableFunctionOperatorTest.java     |   2 +-
 .../runtime/typeutils/PythonTypeUtilsTest.java     |  13 +
 .../PassThroughPythonAggregateFunctionRunner.java  |   4 +-
 .../PassThroughPythonScalarFunctionRunner.java     |   4 +-
 .../PassThroughPythonTableFunctionRunner.java      |   4 +-
 ...ThroughStreamAggregatePythonFunctionRunner.java |   4 +-
 ...amGroupWindowAggregatePythonFunctionRunner.java |   4 +-
 ...ghStreamTableAggregatePythonFunctionRunner.java |   4 +-
 .../flink/table/runtime/utils/PythonTestUtils.java |   5 +-
 .../nodes/exec/common/CommonExecPythonCalc.java    | 106 +++++--
 .../plan/nodes/exec/utils/CommonPythonUtil.java    |  14 +
 72 files changed, 1762 insertions(+), 582 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/python_configuration.html b/docs/layouts/shortcodes/generated/python_configuration.html
index b9d0559..9ad42d8 100644
--- a/docs/layouts/shortcodes/generated/python_configuration.html
+++ b/docs/layouts/shortcodes/generated/python_configuration.html
@@ -27,6 +27,12 @@
             <td>Specify the path of the python interpreter used to execute the python UDF worker. The python UDF worker depends on Python 3.6+, Apache Beam (version == 2.27.0), Pip (version &gt;= 7.1.0) and SetupTools (version &gt;= 37.0.0). Please ensure that the specified environment meets the above requirements. The option is equivalent to the command line option "-pyexec".</td>
         </tr>
         <tr>
+            <td><h5>python.execution-mode</h5></td>
+            <td style="word-wrap: break-word;">"process"</td>
+            <td>String</td>
+            <td>Specify the python runtime execution mode. The optional values are `process`, `multi-thread` and `sub-interpreter`. The `process` mode means that the Python user-defined functions will be executed in separate Python process. The `multi-thread` mode means that the Python user-defined functions will be executed in the same thread as Java Operator, but it will be affected by GIL performance. The `sub-interpreter` mode means that the Python user-defined functions will be exec [...]
+        </tr>
+        <tr>
             <td><h5>python.files</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
diff --git a/flink-python/dev/dev-requirements.txt b/flink-python/dev/dev-requirements.txt
index c6b0ef30..bd75137 100755
--- a/flink-python/dev/dev-requirements.txt
+++ b/flink-python/dev/dev-requirements.txt
@@ -27,3 +27,4 @@ numpy>=1.14.3,<1.20
 fastavro>=0.21.4,<0.24
 grpcio>=1.29.0,<2
 grpcio-tools>=1.3.5,<=1.14.2
+pemja==0.1.2; python_version >= '3.7'
diff --git a/flink-python/pom.xml b/flink-python/pom.xml
index b14e5d1..304a590 100644
--- a/flink-python/pom.xml
+++ b/flink-python/pom.xml
@@ -99,6 +99,13 @@ under the License.
 			<artifactId>beam-runners-core-java</artifactId>
 		</dependency>
 
+		<!-- PemJa dependencies -->
+		<dependency>
+			<groupId>com.alibaba</groupId>
+			<artifactId>pemja</artifactId>
+			<version>0.1.2</version>
+		</dependency>
+
 		<!-- Protobuf dependencies -->
 
 		<dependency>
@@ -375,6 +382,7 @@ under the License.
 									<include>org.apache.arrow:*</include>
 									<include>io.netty:*</include>
 									<include>com.google.flatbuffers:*</include>
+									<include>com.alibaba:pemja</include>
 								</includes>
 							</artifactSet>
 							<filters>
diff --git a/flink-python/pyflink/fn_execution/beam/beam_operations.py b/flink-python/pyflink/fn_execution/beam/beam_operations.py
index 04fc5d4..17074b2 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_operations.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_operations.py
@@ -151,12 +151,13 @@ def _create_user_defined_function_operation(factory, transform_proto, consumers,
         side_inputs=None,
         output_coders=[output_coders[tag] for tag in output_tags])
 
-    if hasattr(spec.serialized_fn, "key_type"):
+    serialized_fn = spec.serialized_fn
+    if hasattr(serialized_fn, "key_type"):
         # keyed operation, need to create the KeyedStateBackend.
-        row_schema = spec.serialized_fn.key_type.row_schema
+        row_schema = serialized_fn.key_type.row_schema
         key_row_coder = FlattenRowCoder([from_proto(f.type) for f in row_schema.fields])
-        if spec.serialized_fn.HasField('group_window'):
-            if spec.serialized_fn.group_window.is_time_window:
+        if serialized_fn.HasField('group_window'):
+            if serialized_fn.group_window.is_time_window:
                 window_coder = TimeWindowCoder()
             else:
                 window_coder = CountWindowCoder()
@@ -166,9 +167,9 @@ def _create_user_defined_function_operation(factory, transform_proto, consumers,
             factory.state_handler,
             key_row_coder,
             window_coder,
-            spec.serialized_fn.state_cache_size,
-            spec.serialized_fn.map_state_read_cache_size,
-            spec.serialized_fn.map_state_write_cache_size)
+            serialized_fn.state_cache_size,
+            serialized_fn.map_state_read_cache_size,
+            serialized_fn.map_state_write_cache_size)
 
         return beam_operation_cls(
             transform_proto.unique_name,
@@ -179,7 +180,7 @@ def _create_user_defined_function_operation(factory, transform_proto, consumers,
             internal_operation_cls,
             keyed_state_backend)
     elif internal_operation_cls == datastream_operations.StatefulOperation:
-        key_row_coder = from_type_info_proto(spec.serialized_fn.key_type_info)
+        key_row_coder = from_type_info_proto(serialized_fn.key_type_info)
         keyed_state_backend = RemoteKeyedStateBackend(
             factory.state_handler,
             key_row_coder,
diff --git a/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx b/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
index c7e5a09..a9ace9d 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
+++ b/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
@@ -199,7 +199,7 @@ cdef class StatelessFunctionOperation(FunctionOperation):
             name, spec, counter_factory, sampler, consumers, operation_cls)
 
     cdef object generate_operation(self):
-        return self.operation_cls(self.spec)
+        return self.operation_cls(self.spec.serialized_fn)
 
 
 cdef class StatefulFunctionOperation(FunctionOperation):
@@ -211,7 +211,7 @@ cdef class StatefulFunctionOperation(FunctionOperation):
             name, spec, counter_factory, sampler, consumers, operation_cls)
 
     cdef object generate_operation(self):
-        return self.operation_cls(self.spec, self._keyed_state_backend)
+        return self.operation_cls(self.spec.serialized_fn, self._keyed_state_backend)
 
     cpdef void add_timer_info(self, timer_family_id, timer_info):
         # ignore timer_family_id
diff --git a/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py b/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
index 809a4b0..57a375a 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
@@ -149,7 +149,7 @@ class StatelessFunctionOperation(FunctionOperation):
             name, spec, counter_factory, sampler, consumers, operation_cls)
 
     def generate_operation(self):
-        return self.operation_cls(self.spec)
+        return self.operation_cls(self.spec.serialized_fn)
 
 
 class StatefulFunctionOperation(FunctionOperation):
@@ -161,7 +161,7 @@ class StatefulFunctionOperation(FunctionOperation):
             name, spec, counter_factory, sampler, consumers, operation_cls)
 
     def generate_operation(self):
-        return self.operation_cls(self.spec, self._keyed_state_backend)
+        return self.operation_cls(self.spec.serialized_fn, self._keyed_state_backend)
 
     def add_timer_info(self, timer_family_id: str, timer_info: TimerInfo):
         # ignore timer_family_id
diff --git a/flink-python/pyflink/fn_execution/coder_impl_fast.pyx b/flink-python/pyflink/fn_execution/coder_impl_fast.pyx
index c99ff5c..75c7706 100644
--- a/flink-python/pyflink/fn_execution/coder_impl_fast.pyx
+++ b/flink-python/pyflink/fn_execution/coder_impl_fast.pyx
@@ -28,7 +28,6 @@ import pickle
 from typing import List, Union
 
 import cloudpickle
-import pyarrow as pa
 
 from pyflink.common import Row, RowKind
 from pyflink.common.time import Instant
@@ -437,6 +436,8 @@ cdef class ArrowCoderImpl(FieldCoderImpl):
         self._batch_reader = self._load_from_stream(self._resettable_io)
 
     cpdef encode_to_stream(self, cols, OutputStream out_stream):
+        import pyarrow as pa
+
         self._resettable_io.set_output_stream(out_stream)
         batch_writer = pa.RecordBatchStreamWriter(self._resettable_io, self._schema)
         batch_writer.write_batch(
@@ -451,6 +452,8 @@ cdef class ArrowCoderImpl(FieldCoderImpl):
         return arrow_to_pandas(self._timezone, self._field_types, [next(self._batch_reader)])
 
     def _load_from_stream(self, stream):
+        import pyarrow as pa
+
         while stream.readable():
             reader = pa.ipc.open_stream(stream)
             yield reader.read_next_batch()
diff --git a/flink-python/pyflink/fn_execution/coder_impl_slow.py b/flink-python/pyflink/fn_execution/coder_impl_slow.py
index 77aff11..dc3c186 100644
--- a/flink-python/pyflink/fn_execution/coder_impl_slow.py
+++ b/flink-python/pyflink/fn_execution/coder_impl_slow.py
@@ -22,7 +22,6 @@ from abc import ABC, abstractmethod
 from typing import List
 
 import cloudpickle
-import pyarrow as pa
 
 from pyflink.common import Row, RowKind
 from pyflink.common.time import Instant
@@ -281,6 +280,8 @@ class ArrowCoderImpl(FieldCoderImpl):
         self._batch_reader = ArrowCoderImpl._load_from_stream(self._resettable_io)
 
     def encode_to_stream(self, cols, out_stream: OutputStream):
+        import pyarrow as pa
+
         self._resettable_io.set_output_stream(out_stream)
         batch_writer = pa.RecordBatchStreamWriter(self._resettable_io, self._schema)
         batch_writer.write_batch(
@@ -296,6 +297,8 @@ class ArrowCoderImpl(FieldCoderImpl):
 
     @staticmethod
     def _load_from_stream(stream):
+        import pyarrow as pa
+
         while stream.readable():
             reader = pa.ipc.open_stream(stream)
             yield reader.read_next_batch()
diff --git a/flink-python/pyflink/fn_execution/coders.py b/flink-python/pyflink/fn_execution/coders.py
index 8ef5607..2ddd2f3 100644
--- a/flink-python/pyflink/fn_execution/coders.py
+++ b/flink-python/pyflink/fn_execution/coders.py
@@ -19,14 +19,12 @@
 import os
 from abc import ABC, abstractmethod
 
-import pyarrow as pa
 import pytz
 
 from pyflink.common.typeinfo import TypeInformation, BasicTypeInfo, BasicType, DateTypeInfo, \
     TimeTypeInfo, TimestampTypeInfo, PrimitiveArrayTypeInfo, BasicArrayTypeInfo, TupleTypeInfo, \
     MapTypeInfo, ListTypeInfo, RowTypeInfo, PickledBytesTypeInfo, ObjectArrayTypeInfo, \
     ExternalTypeInfo
-from pyflink.fn_execution import flink_fn_execution_pb2
 from pyflink.table.types import TinyIntType, SmallIntType, IntType, BigIntType, BooleanType, \
     FloatType, DoubleType, VarCharType, VarBinaryType, DecimalType, DateType, TimeType, \
     LocalZonedTimestampType, RowType, RowField, to_arrow_type, TimestampType, ArrayType
@@ -59,6 +57,8 @@ class LengthPrefixBaseCoder(ABC):
 
     @classmethod
     def from_coder_info_descriptor_proto(cls, coder_info_descriptor_proto):
+        from pyflink.fn_execution import flink_fn_execution_pb2
+
         field_coder = cls._to_field_coder(coder_info_descriptor_proto)
         mode = coder_info_descriptor_proto.mode
         separated_with_end_message = coder_info_descriptor_proto.separated_with_end_message
@@ -98,11 +98,15 @@ class LengthPrefixBaseCoder(ABC):
 
     @classmethod
     def _to_arrow_schema(cls, row_type):
+        import pyarrow as pa
+
         return pa.schema([pa.field(n, to_arrow_type(t), t._nullable)
                           for n, t in zip(row_type.field_names(), row_type.field_types())])
 
     @classmethod
     def _to_data_type(cls, field_type):
+        from pyflink.fn_execution import flink_fn_execution_pb2
+
         if field_type.type_name == flink_fn_execution_pb2.Schema.TINYINT:
             return TinyIntType(field_type.nullable)
         elif field_type.type_name == flink_fn_execution_pb2.Schema.SMALLINT:
@@ -593,24 +597,6 @@ class DataViewFilterCoder(FieldCoder):
         return coder_impl.DataViewFilterCoderImpl(self._udf_data_view_specs)
 
 
-type_name = flink_fn_execution_pb2.Schema
-_type_name_mappings = {
-    type_name.TINYINT: TinyIntCoder(),
-    type_name.SMALLINT: SmallIntCoder(),
-    type_name.INT: IntCoder(),
-    type_name.BIGINT: BigIntCoder(),
-    type_name.BOOLEAN: BooleanCoder(),
-    type_name.FLOAT: FloatCoder(),
-    type_name.DOUBLE: DoubleCoder(),
-    type_name.BINARY: BinaryCoder(),
-    type_name.VARBINARY: BinaryCoder(),
-    type_name.CHAR: CharCoder(),
-    type_name.VARCHAR: CharCoder(),
-    type_name.DATE: DateCoder(),
-    type_name.TIME: TimeCoder(),
-}
-
-
 def from_proto(field_type):
     """
     Creates the corresponding :class:`Coder` given the protocol representation of the field type.
@@ -618,6 +604,25 @@ def from_proto(field_type):
     :param field_type: the protocol representation of the field type
     :return: :class:`Coder`
     """
+    from pyflink.fn_execution import flink_fn_execution_pb2
+
+    type_name = flink_fn_execution_pb2.Schema
+    _type_name_mappings = {
+        type_name.TINYINT: TinyIntCoder(),
+        type_name.SMALLINT: SmallIntCoder(),
+        type_name.INT: IntCoder(),
+        type_name.BIGINT: BigIntCoder(),
+        type_name.BOOLEAN: BooleanCoder(),
+        type_name.FLOAT: FloatCoder(),
+        type_name.DOUBLE: DoubleCoder(),
+        type_name.BINARY: BinaryCoder(),
+        type_name.VARBINARY: BinaryCoder(),
+        type_name.CHAR: CharCoder(),
+        type_name.VARCHAR: CharCoder(),
+        type_name.DATE: DateCoder(),
+        type_name.TIME: TimeCoder(),
+    }
+
     field_type_name = field_type.type_name
     coder = _type_name_mappings.get(field_type_name)
     if coder is not None:
@@ -642,29 +647,30 @@ def from_proto(field_type):
         raise ValueError("field_type %s is not supported." % field_type)
 
 
-# for data stream type information.
-type_info_name = flink_fn_execution_pb2.TypeInfo
-_type_info_name_mappings = {
-    type_info_name.STRING: CharCoder(),
-    type_info_name.BYTE: TinyIntCoder(),
-    type_info_name.BOOLEAN: BooleanCoder(),
-    type_info_name.SHORT: SmallIntCoder(),
-    type_info_name.INT: IntCoder(),
-    type_info_name.LONG: BigIntCoder(),
-    type_info_name.FLOAT: FloatCoder(),
-    type_info_name.DOUBLE: DoubleCoder(),
-    type_info_name.CHAR: CharCoder(),
-    type_info_name.BIG_INT: BigIntCoder(),
-    type_info_name.BIG_DEC: BigDecimalCoder(),
-    type_info_name.SQL_DATE: DateCoder(),
-    type_info_name.SQL_TIME: TimeCoder(),
-    type_info_name.SQL_TIMESTAMP: TimestampCoder(3),
-    type_info_name.PICKLED_BYTES: CloudPickleCoder(),
-    type_info_name.INSTANT: InstantCoder()
-}
-
-
 def from_type_info_proto(type_info):
+    # for data stream type information.
+    from pyflink.fn_execution import flink_fn_execution_pb2
+
+    type_info_name = flink_fn_execution_pb2.TypeInfo
+    _type_info_name_mappings = {
+        type_info_name.STRING: CharCoder(),
+        type_info_name.BYTE: TinyIntCoder(),
+        type_info_name.BOOLEAN: BooleanCoder(),
+        type_info_name.SHORT: SmallIntCoder(),
+        type_info_name.INT: IntCoder(),
+        type_info_name.LONG: BigIntCoder(),
+        type_info_name.FLOAT: FloatCoder(),
+        type_info_name.DOUBLE: DoubleCoder(),
+        type_info_name.CHAR: CharCoder(),
+        type_info_name.BIG_INT: BigIntCoder(),
+        type_info_name.BIG_DEC: BigDecimalCoder(),
+        type_info_name.SQL_DATE: DateCoder(),
+        type_info_name.SQL_TIME: TimeCoder(),
+        type_info_name.SQL_TIMESTAMP: TimestampCoder(3),
+        type_info_name.PICKLED_BYTES: CloudPickleCoder(),
+        type_info_name.INSTANT: InstantCoder()
+    }
+
     field_type_name = type_info.type_name
     try:
         return _type_info_name_mappings[field_type_name]
diff --git a/flink-python/pyflink/fn_execution/datastream/operations.py b/flink-python/pyflink/fn_execution/datastream/operations.py
index 3d36b30..afaed5a 100644
--- a/flink-python/pyflink/fn_execution/datastream/operations.py
+++ b/flink-python/pyflink/fn_execution/datastream/operations.py
@@ -17,19 +17,15 @@
 ################################################################################
 import abc
 
-from apache_beam.runners.worker.bundle_processor import TimerInfo
-
 from pyflink.common import Row
 from pyflink.common.serializer import VoidNamespaceSerializer
 from pyflink.datastream import TimeDomain, RuntimeContext
-from pyflink.fn_execution import flink_fn_execution_pb2
 from pyflink.fn_execution import pickle
 from pyflink.fn_execution.datastream.process_function import \
     InternalKeyedProcessFunctionOnTimerContext, InternalKeyedProcessFunctionContext, \
     InternalProcessFunctionContext
 from pyflink.fn_execution.datastream.runtime_context import StreamingRuntimeContext
 from pyflink.fn_execution.datastream.window.window_operator import WindowOperator
-from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend
 from pyflink.fn_execution.datastream.timerservice_impl import (
     TimerServiceImpl, InternalTimerServiceImpl, NonKeyedTimerServiceImpl)
 from pyflink.fn_execution.datastream.input_handler import (RunnerInputHandler, TimerHandler,
@@ -43,9 +39,8 @@ DATA_STREAM_STATEFUL_FUNCTION_URN = "flink:transform:ds:stateful_function:v1"
 
 class Operation(abc.ABC):
 
-    def __init__(self, spec):
-        self.spec = spec
-        if self.spec.serialized_fn.metric_enabled:
+    def __init__(self, serialized_fn):
+        if serialized_fn.metric_enabled:
             self.base_metric_group = GenericMetricGroup(None, None)
         else:
             self.base_metric_group = None
@@ -74,13 +69,13 @@ class Operation(abc.ABC):
 
 class StatelessOperation(Operation):
 
-    def __init__(self, spec):
-        super(StatelessOperation, self).__init__(spec)
+    def __init__(self, serialized_fn):
+        super(StatelessOperation, self).__init__(serialized_fn)
         self.open_func, self.close_func, self.process_element_func = \
             extract_stateless_function(
-                user_defined_function_proto=self.spec.serialized_fn,
+                user_defined_function_proto=serialized_fn,
                 runtime_context=StreamingRuntimeContext.of(
-                    self.spec.serialized_fn.runtime_context,
+                    serialized_fn.runtime_context,
                     self.base_metric_group))
 
     def open(self):
@@ -95,15 +90,15 @@ class StatelessOperation(Operation):
 
 class StatefulOperation(Operation):
 
-    def __init__(self, spec, keyed_state_backend):
-        super(StatefulOperation, self).__init__(spec)
+    def __init__(self, serialized_fn, keyed_state_backend):
+        super(StatefulOperation, self).__init__(serialized_fn)
         self.keyed_state_backend = keyed_state_backend
         self.open_func, self.close_func, self.process_element_func, self.process_timer_func, \
             self.internal_timer_service = \
             extract_stateful_function(
-                user_defined_function_proto=self.spec.serialized_fn,
+                user_defined_function_proto=serialized_fn,
                 runtime_context=StreamingRuntimeContext.of(
-                    self.spec.serialized_fn.runtime_context,
+                    serialized_fn.runtime_context,
                     self.base_metric_group,
                     self.keyed_state_backend),
                 keyed_state_backend=self.keyed_state_backend)
@@ -124,7 +119,7 @@ class StatefulOperation(Operation):
     def process_timer(self, timer_data):
         return self.process_timer_func(timer_data)
 
-    def add_timer_info(self, timer_info: TimerInfo):
+    def add_timer_info(self, timer_info):
         self.internal_timer_service.add_timer_info(timer_info)
 
 
@@ -136,6 +131,8 @@ def extract_stateless_function(user_defined_function_proto, runtime_context: Run
     :param user_defined_function_proto: the proto representation of the Python :class:`Function`
     :param runtime_context: the streaming runtime context
     """
+    from pyflink.fn_execution import flink_fn_execution_pb2
+
     func_type = user_defined_function_proto.function_type
     UserDefinedDataStreamFunction = flink_fn_execution_pb2.UserDefinedDataStreamFunction
 
@@ -210,7 +207,9 @@ def extract_stateless_function(user_defined_function_proto, runtime_context: Run
 
 def extract_stateful_function(user_defined_function_proto,
                               runtime_context: RuntimeContext,
-                              keyed_state_backend: RemoteKeyedStateBackend):
+                              keyed_state_backend):
+    from pyflink.fn_execution import flink_fn_execution_pb2
+
     func_type = user_defined_function_proto.function_type
     user_defined_func = pickle.loads(user_defined_function_proto.payload)
     internal_timer_service = InternalTimerServiceImpl(keyed_state_backend)
diff --git a/flink-python/pyflink/fn_execution/datastream/runtime_context.py b/flink-python/pyflink/fn_execution/datastream/runtime_context.py
index 86969f2..07b4acb 100644
--- a/flink-python/pyflink/fn_execution/datastream/runtime_context.py
+++ b/flink-python/pyflink/fn_execution/datastream/runtime_context.py
@@ -15,14 +15,13 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-from typing import Dict, Union
+from typing import Dict
 
 from pyflink.datastream import RuntimeContext
 from pyflink.datastream.state import ValueStateDescriptor, ValueState, ListStateDescriptor, \
     ListState, MapStateDescriptor, MapState, ReducingStateDescriptor, ReducingState, \
     AggregatingStateDescriptor, AggregatingState
 from pyflink.fn_execution.coders import from_type_info, MapCoder, GenericArrayCoder
-from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend
 from pyflink.metrics import MetricGroup
 
 
@@ -37,7 +36,7 @@ class StreamingRuntimeContext(RuntimeContext):
                  attempt_number: int,
                  job_parameters: Dict[str, str],
                  metric_group: MetricGroup,
-                 keyed_state_backend: Union[RemoteKeyedStateBackend, None],
+                 keyed_state_backend,
                  in_batch_execution_mode: bool):
         self._task_name = task_name
         self._task_name_with_subtasks = task_name_with_subtasks
diff --git a/flink-python/pyflink/fn_execution/datastream/timerservice_impl.py b/flink-python/pyflink/fn_execution/datastream/timerservice_impl.py
index 46397af..cff83a4 100644
--- a/flink-python/pyflink/fn_execution/datastream/timerservice_impl.py
+++ b/flink-python/pyflink/fn_execution/datastream/timerservice_impl.py
@@ -20,14 +20,9 @@ import time
 from enum import Enum
 from io import BytesIO
 
-from apache_beam.runners.worker.bundle_processor import TimerInfo
-from apache_beam.transforms import userstate
-from apache_beam.transforms.window import GlobalWindow
-
 from pyflink.common import Row
 from pyflink.datastream import TimerService
 from pyflink.fn_execution.datastream.timerservice import InternalTimer, K, N, InternalTimerService
-from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend
 
 
 class TimerOperandType(Enum):
@@ -44,7 +39,7 @@ class LegacyInternalTimerServiceImpl(InternalTimerService[N]):
     TODO: Use InternalTimerServiceImpl instead.
     """
 
-    def __init__(self, keyed_state_backend: RemoteKeyedStateBackend):
+    def __init__(self, keyed_state_backend):
         self._keyed_state_backend = keyed_state_backend
         self._current_watermark = None
         self.timers = collections.OrderedDict()
@@ -85,14 +80,17 @@ class InternalTimerServiceImpl(InternalTimerService[N]):
     Internal implementation of InternalTimerService.
     """
 
-    def __init__(self, keyed_state_backend: RemoteKeyedStateBackend):
+    def __init__(self, keyed_state_backend):
         self._keyed_state_backend = keyed_state_backend
         self._current_watermark = None
         self._timer_coder_impl = None
         self._output_stream = None
+
+        from apache_beam.transforms.window import GlobalWindow
+
         self._global_window = GlobalWindow()
 
-    def add_timer_info(self, timer_info: TimerInfo):
+    def add_timer_info(self, timer_info):
         self._timer_coder_impl = timer_info.timer_coder_impl
         self._output_stream = timer_info.output_stream
 
@@ -125,6 +123,8 @@ class InternalTimerServiceImpl(InternalTimerService[N]):
         self._set_timer(TimerOperandType.DELETE_EVENT_TIMER, ts, current_key, namespace)
 
     def _set_timer(self, timer_operation_type, ts, key, namespace):
+        from apache_beam.transforms import userstate
+
         bytes_io = BytesIO()
         self._namespace_serializer.serialize(namespace, bytes_io)
         encoded_namespace = bytes_io.getvalue()
diff --git a/flink-python/pyflink/fn_execution/datastream/window/window_operator.py b/flink-python/pyflink/fn_execution/datastream/window/window_operator.py
index 0ec8873..be172f2 100644
--- a/flink-python/pyflink/fn_execution/datastream/window/window_operator.py
+++ b/flink-python/pyflink/fn_execution/datastream/window/window_operator.py
@@ -28,7 +28,6 @@ from pyflink.datastream.window import MAX_LONG_VALUE
 from pyflink.fn_execution.datastream.window.merging_window_set import MergingWindowSet
 from pyflink.fn_execution.internal_state import InternalMergingState, InternalKvState, \
     InternalAppendingState
-from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend
 from pyflink.metrics import MetricGroup
 
 T = TypeVar("T")
@@ -269,7 +268,7 @@ class WindowOperator(object):
 
     def __init__(self,
                  window_assigner: WindowAssigner,
-                 keyed_state_backend: RemoteKeyedStateBackend,
+                 keyed_state_backend,
                  user_key_selector,
                  window_state_descriptor: StateDescriptor,
                  window_function: InternalWindowFunction,
diff --git a/flink-python/pyflink/fn_execution/table/aggregate_fast.pyx b/flink-python/pyflink/fn_execution/table/aggregate_fast.pyx
index 265552b..2db06c7 100644
--- a/flink-python/pyflink/fn_execution/table/aggregate_fast.pyx
+++ b/flink-python/pyflink/fn_execution/table/aggregate_fast.pyx
@@ -26,7 +26,6 @@ from pyflink.common import Row
 from pyflink.fn_execution.coders import PickleCoder
 from pyflink.fn_execution.table.state_data_view import DataViewSpec, ListViewSpec, MapViewSpec, \
     PerKeyStateDataViewStore
-from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend
 from pyflink.table import AggregateFunction, TableAggregateFunction
 
 cdef InternalRow join_row(list left, list right, InternalRowKind row_kind):
@@ -435,7 +434,7 @@ cdef class GroupAggFunctionBase:
     def __init__(self,
                  aggs_handle: AggsHandleFunctionBase,
                  key_selector: RowKeySelector,
-                 state_backend: RemoteKeyedStateBackend,
+                 state_backend,
                  state_value_coder,
                  generate_update_before: bool,
                  state_cleaning_enabled: bool,
@@ -479,7 +478,7 @@ cdef class GroupAggFunction(GroupAggFunctionBase):
     def __init__(self,
                  aggs_handle,
                  key_selector: RowKeySelector,
-                 state_backend: RemoteKeyedStateBackend,
+                 state_backend,
                  state_value_coder,
                  generate_update_before: bool,
                  state_cleaning_enabled: bool,
@@ -586,7 +585,7 @@ cdef class GroupTableAggFunction(GroupAggFunctionBase):
     def __init__(self,
                  aggs_handle,
                  key_selector: RowKeySelector,
-                 state_backend: RemoteKeyedStateBackend,
+                 state_backend,
                  state_value_coder,
                  generate_update_before: bool,
                  state_cleaning_enabled: bool,
diff --git a/flink-python/pyflink/fn_execution/table/aggregate_slow.py b/flink-python/pyflink/fn_execution/table/aggregate_slow.py
index d847d53..e10f6b0 100644
--- a/flink-python/pyflink/fn_execution/table/aggregate_slow.py
+++ b/flink-python/pyflink/fn_execution/table/aggregate_slow.py
@@ -22,7 +22,6 @@ from pyflink.common import Row, RowKind
 from pyflink.fn_execution.coders import PickleCoder
 from pyflink.fn_execution.table.state_data_view import DataViewSpec, ListViewSpec, MapViewSpec, \
     PerKeyStateDataViewStore
-from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend
 from pyflink.table import AggregateFunction, FunctionContext, TableAggregateFunction
 from pyflink.table.udf import ImperativeAggregateFunction
 
@@ -409,7 +408,7 @@ class GroupAggFunctionBase(object):
     def __init__(self,
                  aggs_handle: AggsHandleFunctionBase,
                  key_selector: RowKeySelector,
-                 state_backend: RemoteKeyedStateBackend,
+                 state_backend,
                  state_value_coder,
                  generate_update_before: bool,
                  state_cleaning_enabled: bool,
@@ -455,7 +454,7 @@ class GroupAggFunction(GroupAggFunctionBase):
     def __init__(self,
                  aggs_handle: AggsHandleFunction,
                  key_selector: RowKeySelector,
-                 state_backend: RemoteKeyedStateBackend,
+                 state_backend,
                  state_value_coder,
                  generate_update_before: bool,
                  state_cleaning_enabled: bool,
@@ -550,7 +549,7 @@ class GroupTableAggFunction(GroupAggFunctionBase):
     def __init__(self,
                  aggs_handle: TableAggsHandleFunction,
                  key_selector: RowKeySelector,
-                 state_backend: RemoteKeyedStateBackend,
+                 state_backend,
                  state_value_coder,
                  generate_update_before: bool,
                  state_cleaning_enabled: bool,
diff --git a/flink-python/pyflink/fn_execution/table/operations.py b/flink-python/pyflink/fn_execution/table/operations.py
index ab0f72a..3f4069f 100644
--- a/flink-python/pyflink/fn_execution/table/operations.py
+++ b/flink-python/pyflink/fn_execution/table/operations.py
@@ -24,7 +24,6 @@ from pyflink.fn_execution.coders import DataViewFilterCoder, PickleCoder
 from pyflink.fn_execution.datastream.timerservice import InternalTimer
 from pyflink.fn_execution.datastream.operations import Operation
 from pyflink.fn_execution.datastream.timerservice_impl import TimerOperandType, InternalTimerImpl
-from pyflink.fn_execution import flink_fn_execution_pb2
 from pyflink.fn_execution.table.state_data_view import extract_data_view_specs
 
 from pyflink.fn_execution.table.window_assigner import TumblingWindowAssigner, \
@@ -76,9 +75,9 @@ class BundleOperation(object):
 
 
 class BaseOperation(Operation):
-    def __init__(self, spec):
-        super(BaseOperation, self).__init__(spec)
-        self.func, self.user_defined_funcs = self.generate_func(self.spec.serialized_fn)
+    def __init__(self, serialized_fn):
+        super(BaseOperation, self).__init__(serialized_fn)
+        self.func, self.user_defined_funcs = self.generate_func(serialized_fn)
 
     def process_element(self, value):
         return self.func(value)
@@ -99,8 +98,10 @@ class BaseOperation(Operation):
 
 
 class ScalarFunctionOperation(BaseOperation):
-    def __init__(self, spec):
-        super(ScalarFunctionOperation, self).__init__(spec)
+    def __init__(self, serialized_fn, one_arg_optimization=False, one_result_optimization=False):
+        self._one_arg_optimization = one_arg_optimization
+        self._one_result_optimization = one_result_optimization
+        super(ScalarFunctionOperation, self).__init__(serialized_fn)
 
     def generate_func(self, serialized_fn):
         """
@@ -114,14 +115,20 @@ class ScalarFunctionOperation(BaseOperation):
                 ','.join([x[0], y[0]]),
                 dict(chain(x[1].items(), y[1].items())),
                 x[2] + y[2]),
-            [operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs])
-        generate_func = eval('lambda value: [%s]' % scalar_functions, variable_dict)
+            [operation_utils.extract_user_defined_function(
+                udf, one_arg_optimization=self._one_arg_optimization)
+                for udf in serialized_fn.udfs])
+        if self._one_result_optimization:
+            func_str = 'lambda value: %s' % scalar_functions
+        else:
+            func_str = 'lambda value: [%s]' % scalar_functions
+        generate_func = eval(func_str, variable_dict)
         return generate_func, user_defined_funcs
 
 
 class TableFunctionOperation(BaseOperation):
-    def __init__(self, spec):
-        super(TableFunctionOperation, self).__init__(spec)
+    def __init__(self, serialized_fn):
+        super(TableFunctionOperation, self).__init__(serialized_fn)
 
     def generate_func(self, serialized_fn):
         """
@@ -140,8 +147,8 @@ class TableFunctionOperation(BaseOperation):
 
 
 class PandasAggregateFunctionOperation(BaseOperation):
-    def __init__(self, spec):
-        super(PandasAggregateFunctionOperation, self).__init__(spec)
+    def __init__(self, serialized_fn):
+        super(PandasAggregateFunctionOperation, self).__init__(serialized_fn)
 
     def generate_func(self, serialized_fn):
         pandas_functions, variable_dict, user_defined_funcs = reduce(
@@ -158,13 +165,16 @@ class PandasAggregateFunctionOperation(BaseOperation):
 
 
 class PandasBatchOverWindowAggregateFunctionOperation(BaseOperation):
-    def __init__(self, spec):
-        super(PandasBatchOverWindowAggregateFunctionOperation, self).__init__(spec)
-        self.windows = [window for window in self.spec.serialized_fn.windows]
+    def __init__(self, serialized_fn):
+        super(PandasBatchOverWindowAggregateFunctionOperation, self).__init__(serialized_fn)
+        self.windows = [window for window in serialized_fn.windows]
         # the index among all the bounded range over window
         self.bounded_range_window_index = [-1 for _ in range(len(self.windows))]
         # Whether the specified position window is a bounded range window.
         self.is_bounded_range_window = []
+
+        from pyflink.fn_execution import flink_fn_execution_pb2
+
         window_types = flink_fn_execution_pb2.OverWindow
 
         bounded_range_window_nums = 0
@@ -193,6 +203,8 @@ class PandasBatchOverWindowAggregateFunctionOperation(BaseOperation):
 
     def wrapped_over_window_function(self, boundaries_series):
         import pandas as pd
+        from pyflink.fn_execution import flink_fn_execution_pb2
+
         OverWindow = flink_fn_execution_pb2.OverWindow
         input_series = boundaries_series[-1]
         # the row number of the arrow format data
@@ -264,9 +276,9 @@ class PandasBatchOverWindowAggregateFunctionOperation(BaseOperation):
 
 class BaseStatefulOperation(BaseOperation, abc.ABC):
 
-    def __init__(self, spec, keyed_state_backend):
+    def __init__(self, serialized_fn, keyed_state_backend):
         self.keyed_state_backend = keyed_state_backend
-        super(BaseStatefulOperation, self).__init__(spec)
+        super(BaseStatefulOperation, self).__init__(serialized_fn)
 
     def finish(self):
         super().finish()
@@ -282,19 +294,20 @@ REGISTER_PROCESSING_TIMER = 1
 
 class AbstractStreamGroupAggregateOperation(BaseStatefulOperation):
 
-    def __init__(self, spec, keyed_state_backend):
-        self.generate_update_before = spec.serialized_fn.generate_update_before
-        self.grouping = [i for i in spec.serialized_fn.grouping]
+    def __init__(self, serialized_fn, keyed_state_backend):
+        self.generate_update_before = serialized_fn.generate_update_before
+        self.grouping = [i for i in serialized_fn.grouping]
         self.group_agg_function = None
         # If the upstream generates retract message, we need to add an additional count1() agg
         # to track current accumulated messages count. If all the messages are retracted, we need
         # to send a DELETE message to downstream.
-        self.index_of_count_star = spec.serialized_fn.index_of_count_star
-        self.count_star_inserted = spec.serialized_fn.count_star_inserted
-        self.state_cache_size = spec.serialized_fn.state_cache_size
-        self.state_cleaning_enabled = spec.serialized_fn.state_cleaning_enabled
-        self.data_view_specs = extract_data_view_specs(spec.serialized_fn.udfs)
-        super(AbstractStreamGroupAggregateOperation, self).__init__(spec, keyed_state_backend)
+        self.index_of_count_star = serialized_fn.index_of_count_star
+        self.count_star_inserted = serialized_fn.count_star_inserted
+        self.state_cache_size = serialized_fn.state_cache_size
+        self.state_cleaning_enabled = serialized_fn.state_cleaning_enabled
+        self.data_view_specs = extract_data_view_specs(serialized_fn.udfs)
+        super(AbstractStreamGroupAggregateOperation, self).__init__(
+            serialized_fn, keyed_state_backend)
 
     def open(self):
         self.group_agg_function.open(FunctionContext(self.base_metric_group))
@@ -363,8 +376,8 @@ class AbstractStreamGroupAggregateOperation(BaseStatefulOperation):
 
 class StreamGroupAggregateOperation(AbstractStreamGroupAggregateOperation, BundleOperation):
 
-    def __init__(self, spec, keyed_state_backend):
-        super(StreamGroupAggregateOperation, self).__init__(spec, keyed_state_backend)
+    def __init__(self, serialized_fn, keyed_state_backend):
+        super(StreamGroupAggregateOperation, self).__init__(serialized_fn, keyed_state_backend)
 
     def finish_bundle(self):
         return self.group_agg_function.finish_bundle()
@@ -393,8 +406,8 @@ class StreamGroupAggregateOperation(AbstractStreamGroupAggregateOperation, Bundl
 
 
 class StreamGroupTableAggregateOperation(AbstractStreamGroupAggregateOperation, BundleOperation):
-    def __init__(self, spec, keyed_state_backend):
-        super(StreamGroupTableAggregateOperation, self).__init__(spec, keyed_state_backend)
+    def __init__(self, serialized_fn, keyed_state_backend):
+        super(StreamGroupTableAggregateOperation, self).__init__(serialized_fn, keyed_state_backend)
 
     def finish_bundle(self):
         return self.group_agg_function.finish_bundle()
@@ -420,17 +433,20 @@ class StreamGroupTableAggregateOperation(AbstractStreamGroupAggregateOperation,
 
 
 class StreamGroupWindowAggregateOperation(AbstractStreamGroupAggregateOperation):
-    def __init__(self, spec, keyed_state_backend):
-        self._window = spec.serialized_fn.group_window
+    def __init__(self, serialized_fn, keyed_state_backend):
+        self._window = serialized_fn.group_window
         self._named_property_extractor = self._create_named_property_function()
         self._is_time_window = None
         self._reuse_timer_data = Row()
         self._reuse_key_data = Row()
-        super(StreamGroupWindowAggregateOperation, self).__init__(spec, keyed_state_backend)
+        super(StreamGroupWindowAggregateOperation, self).__init__(
+            serialized_fn, keyed_state_backend)
 
     def create_process_function(self, user_defined_aggs, input_extractors, filter_args,
                                 distinct_indexes, distinct_view_descriptors, key_selector,
                                 state_value_coder):
+        from pyflink.fn_execution import flink_fn_execution_pb2
+
         self._is_time_window = self._window.is_time_window
         self._namespace_coder = self.keyed_state_backend._namespace_coder_impl
         if self._window.window_type == flink_fn_execution_pb2.GroupWindow.TUMBLING_GROUP_WINDOW:
@@ -515,6 +531,8 @@ class StreamGroupWindowAggregateOperation(AbstractStreamGroupAggregateOperation)
                 yield [NORMAL_RECORD, result_data, None]
 
     def _create_named_property_function(self):
+        from pyflink.fn_execution import flink_fn_execution_pb2
+
         named_property_extractor_array = []
         for named_property in self._window.namedProperties:
             if named_property == flink_fn_execution_pb2.GroupWindow.WINDOW_START:
diff --git a/flink-python/pyflink/fn_execution/table/state_data_view.py b/flink-python/pyflink/fn_execution/table/state_data_view.py
index 0388e63..bc6c9dd 100644
--- a/flink-python/pyflink/fn_execution/table/state_data_view.py
+++ b/flink-python/pyflink/fn_execution/table/state_data_view.py
@@ -22,7 +22,6 @@ from pyflink.datastream.state import ListState, MapState
 from pyflink.fn_execution.coders import from_proto, PickleCoder
 from pyflink.fn_execution.internal_state import InternalListState, InternalMapState
 from pyflink.fn_execution.utils.operation_utils import is_built_in_function, load_aggregate_function
-from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend
 from pyflink.table import FunctionContext
 from pyflink.table.data_view import ListView, MapView, DataView
 
@@ -230,7 +229,7 @@ class StateDataViewStore(ABC):
 
     def __init__(self,
                  function_context: FunctionContext,
-                 keyed_state_backend: RemoteKeyedStateBackend):
+                 keyed_state_backend):
         self._function_context = function_context
         self._keyed_state_backend = keyed_state_backend
 
@@ -268,7 +267,7 @@ class PerKeyStateDataViewStore(StateDataViewStore):
 
     def __init__(self,
                  function_context: FunctionContext,
-                 keyed_state_backend: RemoteKeyedStateBackend):
+                 keyed_state_backend):
         super(PerKeyStateDataViewStore, self).__init__(function_context, keyed_state_backend)
 
     def get_state_list_view(self, state_name, element_coder):
@@ -289,7 +288,7 @@ class PerWindowStateDataViewStore(StateDataViewStore):
 
     def __init__(self,
                  function_context: FunctionContext,
-                 keyed_state_backend: RemoteKeyedStateBackend):
+                 keyed_state_backend):
         super(PerWindowStateDataViewStore, self).__init__(function_context, keyed_state_backend)
 
     def get_state_list_view(self, state_name, element_coder):
diff --git a/flink-python/pyflink/fn_execution/table/window_aggregate_fast.pyx b/flink-python/pyflink/fn_execution/table/window_aggregate_fast.pyx
index 2704a4a..7a21249 100644
--- a/flink-python/pyflink/fn_execution/table/window_aggregate_fast.pyx
+++ b/flink-python/pyflink/fn_execution/table/window_aggregate_fast.pyx
@@ -34,7 +34,6 @@ from pyflink.fn_execution.datastream.timerservice_impl import LegacyInternalTime
 from pyflink.fn_execution.coders import PickleCoder
 from pyflink.fn_execution.table.state_data_view import DataViewSpec, ListViewSpec, MapViewSpec, \
     PerWindowStateDataViewStore
-from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend
 from pyflink.fn_execution.table.window_assigner import WindowAssigner, PanedWindowAssigner, \
     MergingWindowAssigner
 from pyflink.fn_execution.table.window_context import WindowContext, TriggerContext, K, W
@@ -329,7 +328,7 @@ cdef class GroupWindowAggFunctionBase:
     def __init__(self,
                  allowed_lateness: int,
                  key_selector: RowKeySelector,
-                 state_backend: RemoteKeyedStateBackend,
+                 state_backend,
                  state_value_coder,
                  window_assigner: WindowAssigner[W],
                  window_aggregator: NamespaceAggsHandleFunctionBase,
@@ -510,7 +509,7 @@ cdef class GroupWindowAggFunction(GroupWindowAggFunctionBase):
     def __init__(self,
                  allowed_lateness: int,
                  key_selector: RowKeySelector,
-                 state_backend: RemoteKeyedStateBackend,
+                 state_backend,
                  state_value_coder,
                  window_assigner: WindowAssigner[W],
                  window_aggregator: NamespaceAggsHandleFunction[W],
diff --git a/flink-python/pyflink/fn_execution/table/window_aggregate_slow.py b/flink-python/pyflink/fn_execution/table/window_aggregate_slow.py
index 5ed146d..a7e557b 100644
--- a/flink-python/pyflink/fn_execution/table/window_aggregate_slow.py
+++ b/flink-python/pyflink/fn_execution/table/window_aggregate_slow.py
@@ -29,7 +29,6 @@ from pyflink.fn_execution.coders import PickleCoder
 from pyflink.fn_execution.table.aggregate_slow import DistinctViewDescriptor, RowKeySelector
 from pyflink.fn_execution.table.state_data_view import DataViewSpec, ListViewSpec, MapViewSpec, \
     PerWindowStateDataViewStore
-from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend
 from pyflink.fn_execution.table.window_assigner import WindowAssigner, PanedWindowAssigner, \
     MergingWindowAssigner
 from pyflink.fn_execution.table.window_context import WindowContext, TriggerContext, K, W
@@ -289,7 +288,7 @@ class GroupWindowAggFunctionBase(Generic[K, W]):
     def __init__(self,
                  allowed_lateness: int,
                  key_selector: RowKeySelector,
-                 state_backend: RemoteKeyedStateBackend,
+                 state_backend,
                  state_value_coder,
                  window_assigner: WindowAssigner[W],
                  window_aggregator: NamespaceAggsHandleFunctionBase[W],
@@ -456,7 +455,7 @@ class GroupWindowAggFunction(GroupWindowAggFunctionBase[K, W]):
     def __init__(self,
                  allowed_lateness: int,
                  key_selector: RowKeySelector,
-                 state_backend: RemoteKeyedStateBackend,
+                 state_backend,
                  state_value_coder,
                  window_assigner: WindowAssigner[W],
                  window_aggregator: NamespaceAggsHandleFunction[W],
diff --git a/flink-python/pyflink/fn_execution/table/window_context.py b/flink-python/pyflink/fn_execution/table/window_context.py
index fe64f7f..2e1946e 100644
--- a/flink-python/pyflink/fn_execution/table/window_context.py
+++ b/flink-python/pyflink/fn_execution/table/window_context.py
@@ -19,15 +19,12 @@ import sys
 from abc import ABC, abstractmethod
 from typing import Generic, TypeVar, List, Iterable
 
-from apache_beam.coders import Coder
-
 from pyflink.datastream.state import StateDescriptor, State, ValueStateDescriptor, \
     ListStateDescriptor, MapStateDescriptor
 from pyflink.datastream.window import TimeWindow, CountWindow
 from pyflink.fn_execution.datastream.timerservice_impl import LegacyInternalTimerServiceImpl
 from pyflink.fn_execution.coders import from_type_info, MapCoder, GenericArrayCoder
 from pyflink.fn_execution.internal_state import InternalMergingState
-from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend
 
 MAX_LONG_VALUE = sys.maxsize
 
@@ -119,8 +116,8 @@ class WindowContext(Context[K, W]):
     def __init__(self,
                  window_operator,
                  trigger_context: 'TriggerContext',
-                 state_backend: RemoteKeyedStateBackend,
-                 state_value_coder: Coder,
+                 state_backend,
+                 state_value_coder,
                  timer_service: LegacyInternalTimerServiceImpl,
                  is_event_time: bool):
         self._window_operator = window_operator
@@ -183,7 +180,7 @@ class TriggerContext(object):
     def __init__(self,
                  trigger,
                  timer_service: LegacyInternalTimerServiceImpl[W],
-                 state_backend: RemoteKeyedStateBackend):
+                 state_backend):
         self._trigger = trigger
         self._timer_service = timer_service
         self._state_backend = state_backend
diff --git a/flink-python/pyflink/fn_execution/table/window_process_function.py b/flink-python/pyflink/fn_execution/table/window_process_function.py
index cfe0d2f..733d4c1 100644
--- a/flink-python/pyflink/fn_execution/table/window_process_function.py
+++ b/flink-python/pyflink/fn_execution/table/window_process_function.py
@@ -21,7 +21,6 @@ from typing import Generic, List, Iterable, Dict, Set
 
 from pyflink.common import Row
 from pyflink.datastream.state import MapState
-from pyflink.fn_execution.state_impl import LRUCache
 from pyflink.fn_execution.table.window_assigner import WindowAssigner, PanedWindowAssigner, \
     MergingWindowAssigner
 from pyflink.fn_execution.table.window_context import Context, K, W
@@ -236,6 +235,9 @@ class MergingWindowProcessFunction(InternalWindowProcessFunction[K, W]):
         self._window_mapping = None  # type: MapState
         self._state_backend = state_backend
         self._sorted_windows = None  # type: List
+
+        from pyflink.fn_execution.state_impl import LRUCache
+
         self._cached_sorted_windows = LRUCache(10000, None)
 
     def open(self, ctx: Context[K, W]):
diff --git a/flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py b/flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py
index 119c887..c98a0b8 100644
--- a/flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py
+++ b/flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py
@@ -105,7 +105,7 @@ class PythonBootTests(PyFlinkTestCase):
 
     def test_set_working_directory(self):
         JProcessPythonEnvironmentManager = \
-            get_gateway().jvm.org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager
+            get_gateway().jvm.org.apache.flink.python.env.process.ProcessPythonEnvironmentManager
 
         output_file = os.path.join(self.tmp_dir, "output.txt")
         pyflink_dir = os.path.join(self.tmp_dir, "pyflink")
diff --git a/flink-python/pyflink/fn_execution/utils/operation_utils.py b/flink-python/pyflink/fn_execution/utils/operation_utils.py
index 9ed177d..3f47ca0 100644
--- a/flink-python/pyflink/fn_execution/utils/operation_utils.py
+++ b/flink-python/pyflink/fn_execution/utils/operation_utils.py
@@ -93,7 +93,8 @@ def extract_over_window_user_defined_function(user_defined_function_proto):
     return (*extract_user_defined_function(user_defined_function_proto, True), window_index)
 
 
-def extract_user_defined_function(user_defined_function_proto, pandas_udaf=False)\
+def extract_user_defined_function(user_defined_function_proto, pandas_udaf=False,
+                                  one_arg_optimization=False)\
         -> Tuple[str, Dict, List]:
     """
     Extracts user-defined-function from the proto representation of a
@@ -101,6 +102,7 @@ def extract_user_defined_function(user_defined_function_proto, pandas_udaf=False
 
     :param user_defined_function_proto: the proto representation of the Python
     :param pandas_udaf: whether the user_defined_function_proto is pandas udaf
+    :param one_arg_optimization: whether the optimization enabled
     :class:`UserDefinedFunction`
     """
 
@@ -116,13 +118,17 @@ def extract_user_defined_function(user_defined_function_proto, pandas_udaf=False
         for arg in args:
             if arg.HasField("udf"):
                 # for chaining Python UDF input: the input argument is a Python ScalarFunction
-                udf_arg, udf_variable_dict, udf_funcs = extract_user_defined_function(arg.udf)
+                udf_arg, udf_variable_dict, udf_funcs = extract_user_defined_function(
+                    arg.udf, one_arg_optimization=one_arg_optimization)
                 args_str.append(udf_arg)
                 local_variable_dict.update(udf_variable_dict)
                 local_funcs.extend(udf_funcs)
             elif arg.HasField("inputOffset"):
-                # the input argument is a column of the input row
-                args_str.append("value[%s]" % arg.inputOffset)
+                if one_arg_optimization:
+                    args_str.append("value")
+                else:
+                    # the input argument is a column of the input row
+                    args_str.append("value[%s]" % arg.inputOffset)
             else:
                 # the input argument is a constant value
                 constant_value_name, parsed_constant_value = \
@@ -275,6 +281,43 @@ def load_aggregate_function(payload):
         return pickle.loads(payload)
 
 
+def parse_function_proto(proto):
+    from pyflink.fn_execution import flink_fn_execution_pb2
+    serialized_fn = flink_fn_execution_pb2.UserDefinedFunctions()
+    serialized_fn.ParseFromString(proto)
+    return serialized_fn
+
+
+def deserialized_operation_from_serialized_bytes(b):
+    import cloudpickle
+    return cloudpickle.loads(b)
+
+
+def create_scalar_operation_from_proto(proto, one_arg_optimization=False,
+                                       one_result_optimization=False):
+    from pyflink.fn_execution.table.operations import ScalarFunctionOperation
+
+    serialized_fn = parse_function_proto(proto)
+    scalar_operation = ScalarFunctionOperation(
+        serialized_fn, one_arg_optimization, one_result_optimization)
+    return scalar_operation
+
+
+def create_serialized_scalar_operation_from_proto(proto, one_arg_optimization=False,
+                                                  one_result_optimization=False):
+    """
+    The CPython extension included in proto does not support initialization multiple times, so we
+    choose the only interpreter process to be responsible for initialization and proto parsing. The
+    only interpreter parses the proto and serializes function operations with cloudpickle.
+    """
+
+    import cloudpickle
+
+    scalar_operation = create_scalar_operation_from_proto(
+        bytes(b % 256 for b in proto), one_arg_optimization, one_result_optimization)
+    return cloudpickle.dumps(scalar_operation)
+
+
 class PeriodicThread(threading.Thread):
     """Call a function periodically with the specified number of seconds"""
 
diff --git a/flink-python/pyflink/table/tests/test_dependency.py b/flink-python/pyflink/table/tests/test_dependency.py
index 22c3059..928842a 100644
--- a/flink-python/pyflink/table/tests/test_dependency.py
+++ b/flink-python/pyflink/table/tests/test_dependency.py
@@ -21,6 +21,8 @@ import sys
 import unittest
 import uuid
 
+import pytest
+
 from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
 from pyflink.table import expressions as expr
 from pyflink.table.udf import udf
@@ -63,6 +65,48 @@ class DependencyTests(object):
         actual = source_sink_utils.results()
         self.assert_equals(actual, ["+I[3, 1]", "+I[4, 2]", "+I[5, 3]"])
 
+    def test_add_python_archive(self):
+        tmp_dir = self.tempdir
+        archive_dir_path = os.path.join(tmp_dir, "archive_" + str(uuid.uuid4()))
+        os.mkdir(archive_dir_path)
+        with open(os.path.join(archive_dir_path, "data.txt"), 'w') as f:
+            f.write("2")
+        archive_file_path = \
+            shutil.make_archive(os.path.dirname(archive_dir_path), 'zip', archive_dir_path)
+        self.t_env.add_python_archive(archive_file_path, "data")
+
+        def add_from_file(i):
+            with open("data/data.txt", 'r') as f:
+                return i + int(f.read())
+
+        self.t_env.create_temporary_system_function("add_from_file",
+                                                    udf(add_from_file, DataTypes.BIGINT(),
+                                                        DataTypes.BIGINT()))
+        table_sink = source_sink_utils.TestAppendSink(
+            ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
+        self.t_env.register_table_sink("Results", table_sink)
+        t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
+        t.select(expr.call('add_from_file', t.a), t.a).execute_insert("Results").wait()
+
+        actual = source_sink_utils.results()
+        self.assert_equals(actual, ["+I[3, 1]", "+I[4, 2]", "+I[5, 3]"])
+
+
+@pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7")
+class EmbeddedMultiThreadDependencyTests(DependencyTests, PyFlinkStreamTableTestCase):
+    def setUp(self):
+        super(EmbeddedMultiThreadDependencyTests, self).setUp()
+        self.t_env.get_config().get_configuration().set_string("python.execution-mode",
+                                                               "multi-thread")
+
+
+@pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7")
+class EmbeddedSubInterpreterDependencyTests(DependencyTests, PyFlinkStreamTableTestCase):
+    def setUp(self):
+        super(EmbeddedSubInterpreterDependencyTests, self).setUp()
+        self.t_env.get_config().get_configuration().set_string("python.execution-mode",
+                                                               "sub-interpreter")
+
 
 class BatchDependencyTests(DependencyTests, PyFlinkBatchTableTestCase):
 
@@ -150,32 +194,6 @@ class StreamDependencyTests(DependencyTests, PyFlinkStreamTableTestCase):
         actual = source_sink_utils.results()
         self.assert_equals(actual, ["+I[2, 1]", "+I[3, 2]", "+I[4, 3]"])
 
-    def test_add_python_archive(self):
-        tmp_dir = self.tempdir
-        archive_dir_path = os.path.join(tmp_dir, "archive_" + str(uuid.uuid4()))
-        os.mkdir(archive_dir_path)
-        with open(os.path.join(archive_dir_path, "data.txt"), 'w') as f:
-            f.write("2")
-        archive_file_path = \
-            shutil.make_archive(os.path.dirname(archive_dir_path), 'zip', archive_dir_path)
-        self.t_env.add_python_archive(archive_file_path, "data")
-
-        def add_from_file(i):
-            with open("data/data.txt", 'r') as f:
-                return i + int(f.read())
-
-        self.t_env.create_temporary_system_function("add_from_file",
-                                                    udf(add_from_file, DataTypes.BIGINT(),
-                                                        DataTypes.BIGINT()))
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
-        self.t_env.register_table_sink("Results", table_sink)
-        t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
-        t.select(expr.call('add_from_file', t.a), t.a).execute_insert("Results").wait()
-
-        actual = source_sink_utils.results()
-        self.assert_equals(actual, ["+I[3, 1]", "+I[4, 2]", "+I[5, 3]"])
-
     def test_set_environment(self):
         python_exec_link_path = sys.executable
         self.st_env.get_config().set_python_executable(python_exec_link_path)
diff --git a/flink-python/pyflink/table/tests/test_udf.py b/flink-python/pyflink/table/tests/test_udf.py
index a885118..283143b 100644
--- a/flink-python/pyflink/table/tests/test_udf.py
+++ b/flink-python/pyflink/table/tests/test_udf.py
@@ -17,8 +17,10 @@
 ################################################################################
 import datetime
 import os
+import sys
 import unittest
 
+import pytest
 import pytz
 
 from pyflink.table import DataTypes, expressions as expr
@@ -52,8 +54,9 @@ class UserDefinedFunctionTests(object):
 
         # check memory limit is set
         @udf(result_type=DataTypes.BIGINT())
-        def check_memory_limit():
-            assert os.environ['_PYTHON_WORKER_MEMORY_LIMIT'] is not None
+        def check_memory_limit(exec_mode):
+            if exec_mode == "process":
+                assert os.environ['_PYTHON_WORKER_MEMORY_LIMIT'] is not None
             return 1
 
         table_sink = source_sink_utils.TestAppendSink(
@@ -62,10 +65,13 @@ class UserDefinedFunctionTests(object):
              DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT()])
         self.t_env.register_table_sink("Results", table_sink)
 
+        execution_mode = self.t_env.get_config().get_configuration().get_string(
+            "python.execution-mode", "process")
+
         t = self.t_env.from_elements([(1, 2, 3), (2, 5, 6), (3, 1, 9)], ['a', 'b', 'c'])
         t.where(add_one(t.b) <= 3).select(
             add_one(t.a), subtract_one(t.b), add(t.a, t.c), add_one_callable(t.a),
-            add_one_partial(t.a), check_memory_limit(), t.a) \
+            add_one_partial(t.a), check_memory_limit(execution_mode), t.a) \
             .execute_insert("Results").wait()
         actual = source_sink_utils.results()
         self.assert_equals(actual, ["+I[2, 1, 4, 2, 2, 1, 1]", "+I[4, 0, 12, 4, 4, 1, 3]"])
@@ -217,7 +223,14 @@ class UserDefinedFunctionTests(object):
 
     def test_open(self):
         self.t_env.get_config().get_configuration().set_string('python.metric.enabled', 'true')
-        subtract = udf(Subtract(), result_type=DataTypes.BIGINT())
+        execution_mode = self.t_env.get_config().get_configuration().get_string(
+            "python.execution-mode", None)
+
+        if execution_mode == "process":
+            subtract = udf(SubtractWithMetrics(), result_type=DataTypes.BIGINT())
+        else:
+            subtract = udf(Subtract(), result_type=DataTypes.BIGINT())
+
         table_sink = source_sink_utils.TestAppendSink(
             ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
         self.t_env.register_table_sink("Results", table_sink)
@@ -783,6 +796,22 @@ class PyFlinkBatchUserDefinedFunctionTests(UserDefinedFunctionTests,
     pass
 
 
+@pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7")
+class PyFlinkEmbeddedMultiThreadTests(UserDefinedFunctionTests, PyFlinkBatchTableTestCase):
+    def setUp(self):
+        super(PyFlinkEmbeddedMultiThreadTests, self).setUp()
+        self.t_env.get_config().get_configuration().set_string("python.execution-mode",
+                                                               "multi-thread")
+
+
+@pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7")
+class PyFlinkEmbeddedSubInterpreterTests(UserDefinedFunctionTests, PyFlinkBatchTableTestCase):
+    def setUp(self):
+        super(PyFlinkEmbeddedSubInterpreterTests, self).setUp()
+        self.t_env.get_config().get_configuration().set_string("python.execution-mode",
+                                                               "sub-interpreter")
+
+
 # test specify the input_types
 @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
 def add(i, j):
@@ -795,7 +824,7 @@ class SubtractOne(ScalarFunction):
         return i - 1
 
 
-class Subtract(ScalarFunction, unittest.TestCase):
+class SubtractWithMetrics(ScalarFunction, unittest.TestCase):
 
     def open(self, function_context):
         self.subtracted_value = 1
@@ -810,6 +839,18 @@ class Subtract(ScalarFunction, unittest.TestCase):
         return i - self.subtracted_value
 
 
+class Subtract(ScalarFunction, unittest.TestCase):
+
+    def open(self, function_context):
+        self.subtracted_value = 1
+        self.counter_sum = 0
+
+    def eval(self, i):
+        # counter
+        self.counter_sum += i
+        return i - self.subtracted_value
+
+
 class CallablePlus(object):
 
     def __call__(self, col):
diff --git a/flink-python/setup.py b/flink-python/setup.py
index dae7e64..6bd3f97 100644
--- a/flink-python/setup.py
+++ b/flink-python/setup.py
@@ -313,6 +313,7 @@ try:
                           'pandas>=1.0,<1.2.0', 'pyarrow>=0.15.1,<3.0.0',
                           'pytz>=2018.3', 'numpy>=1.14.3,<1.20', 'fastavro>=0.21.4,<0.24',
                           'requests>=2.26.0', 'protobuf<3.18',
+                          'pemja==0.1.2;python_full_version >= "3.7"',
                           apache_flink_libraries_dependency],
         cmdclass={'build_ext': build_ext},
         tests_require=['pytest==4.4.1'],
diff --git a/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java b/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java
index d862167..5195f1f 100644
--- a/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java
+++ b/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java
@@ -89,6 +89,9 @@ public class PythonConfig implements Serializable {
     /** Whether profile is enabled. */
     private final boolean profileEnabled;
 
+    /** Execution Mode. */
+    private final String executionMode;
+
     public PythonConfig(Configuration config) {
         this.config = config;
         maxBundleSize = config.get(PythonOptions.MAX_BUNDLE_SIZE);
@@ -110,6 +113,7 @@ public class PythonConfig implements Serializable {
         metricEnabled = config.getBoolean(PythonOptions.PYTHON_METRIC_ENABLED);
         isUsingManagedMemory = config.getBoolean(PythonOptions.USE_MANAGED_MEMORY);
         profileEnabled = config.getBoolean(PythonOptions.PYTHON_PROFILE_ENABLED);
+        executionMode = config.getString(PythonOptions.PYTHON_EXECUTION_MODE);
     }
 
     public int getMaxBundleSize() {
@@ -144,6 +148,10 @@ public class PythonConfig implements Serializable {
         return pythonExec;
     }
 
+    public String getExecutionMode() {
+        return executionMode;
+    }
+
     public boolean isMetricEnabled() {
         return metricEnabled;
     }
diff --git a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
index de68482..874a8d0 100644
--- a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
+++ b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
@@ -214,4 +214,17 @@ public class PythonOptions {
                                     + "in each batch when iterating a Python MapState. Note that this is an experimental flag "
                                     + "and might not be available "
                                     + "in future releases.");
+
+    /** Specify the python runtime execution mode. */
+    @Experimental
+    public static final ConfigOption<String> PYTHON_EXECUTION_MODE =
+            ConfigOptions.key("python.execution-mode")
+                    .defaultValue("process")
+                    .withDescription(
+                            "Specify the python runtime execution mode. The optional values are `process`, `multi-thread` and `sub-interpreter`. "
+                                    + "The `process` mode means that the Python user-defined functions will be executed in separate Python process. "
+                                    + "The `multi-thread` mode means that the Python user-defined functions will be executed in the same thread as Java Operator, but it will be affected by GIL performance. "
+                                    + "The `sub-interpreter` mode means that the Python user-defined functions will be executed in python different sub-interpreters rather than different threads of one interpreter, "
+                                    + "which can largely overcome the effects of the GIL, but it maybe fail in some CPython extensions libraries, such as numpy, tensorflow. "
+                                    + "Note that if the python operator dose not support `multi-thread` and `sub-interpreter` mode, we will still use `process` mode.");
 }
diff --git a/flink-python/src/main/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManager.java b/flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java
similarity index 74%
rename from flink-python/src/main/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManager.java
rename to flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java
index ac63769..2ab0fac 100644
--- a/flink-python/src/main/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManager.java
+++ b/flink-python/src/main/java/org/apache/flink/python/env/AbstractPythonEnvironmentManager.java
@@ -16,16 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.python.env.beam;
+package org.apache.flink.python.env;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.python.env.ProcessPythonEnvironment;
-import org.apache.flink.python.env.PythonDependencyInfo;
-import org.apache.flink.python.env.PythonEnvironment;
-import org.apache.flink.python.env.PythonEnvironmentManager;
 import org.apache.flink.python.util.CompressionUtils;
 import org.apache.flink.python.util.PythonEnvironmentManagerUtils;
 import org.apache.flink.util.FileUtils;
@@ -40,11 +36,8 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.GuardedBy;
 
-import java.io.DataOutputStream;
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.nio.charset.Charset;
 import java.nio.file.FileSystems;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -61,17 +54,30 @@ import java.util.concurrent.locks.ReentrantLock;
 import static org.apache.flink.python.util.PythonDependencyUtils.PARAM_DELIMITER;
 
 /**
- * The ProcessPythonEnvironmentManager is used to prepare the working dir of python UDF worker and
- * create ProcessPythonEnvironment object of Beam Fn API. It's used when the python function runner
- * is configured to run python UDF in process mode.
+ * The base class of python environment manager which is used to create the PythonEnvironment object
+ * used to execute Python functions.
  */
 @Internal
-public final class ProcessPythonEnvironmentManager implements PythonEnvironmentManager {
+public abstract class AbstractPythonEnvironmentManager implements PythonEnvironmentManager {
 
     private static final Logger LOG =
-            LoggerFactory.getLogger(ProcessPythonEnvironmentManager.class);
+            LoggerFactory.getLogger(AbstractPythonEnvironmentManager.class);
 
-    @VisibleForTesting static final String PYFLINK_GATEWAY_DISABLED = "PYFLINK_GATEWAY_DISABLED";
+    private static final long CHECK_INTERVAL = 20;
+    private static final long CHECK_TIMEOUT = 1000;
+
+    private transient Thread shutdownHook;
+
+    protected transient PythonLeasedResource resource;
+
+    protected final PythonDependencyInfo dependencyInfo;
+
+    private final Map<String, String> systemEnv;
+
+    private final String[] tmpDirectories;
+    private final JobID jobID;
+
+    @VisibleForTesting public static final String PYTHON_REQUIREMENTS_DIR = "python-requirements";
 
     @VisibleForTesting
     public static final String PYTHON_REQUIREMENTS_FILE = "_PYTHON_REQUIREMENTS_FILE";
@@ -84,23 +90,14 @@ public final class ProcessPythonEnvironmentManager implements PythonEnvironmentM
 
     @VisibleForTesting public static final String PYTHON_WORKING_DIR = "_PYTHON_WORKING_DIR";
 
-    @VisibleForTesting static final String PYTHON_REQUIREMENTS_DIR = "python-requirements";
-    @VisibleForTesting static final String PYTHON_ARCHIVES_DIR = "python-archives";
-    @VisibleForTesting static final String PYTHON_FILES_DIR = "python-files";
-
-    private static final long CHECK_INTERVAL = 20;
-    private static final long CHECK_TIMEOUT = 1000;
-
-    private transient Thread shutdownHook;
+    @VisibleForTesting public static final String PYTHON_FILES_DIR = "python-files";
 
-    private transient PythonEnvResources.PythonLeasedResource resource;
+    @VisibleForTesting public static final String PYTHON_ARCHIVES_DIR = "python-archives";
 
-    private final PythonDependencyInfo dependencyInfo;
-    private final Map<String, String> systemEnv;
-    private final String[] tmpDirectories;
-    private final JobID jobID;
+    @VisibleForTesting
+    public static final String PYFLINK_GATEWAY_DISABLED = "PYFLINK_GATEWAY_DISABLED";
 
-    public ProcessPythonEnvironmentManager(
+    public AbstractPythonEnvironmentManager(
             PythonDependencyInfo dependencyInfo,
             String[] tmpDirectories,
             Map<String, String> systemEnv,
@@ -131,7 +128,7 @@ public final class ProcessPythonEnvironmentManager implements PythonEnvironmentM
                         });
         shutdownHook =
                 ShutdownHookUtil.addShutdownHook(
-                        this, ProcessPythonEnvironmentManager.class.getSimpleName(), LOG);
+                        this, AbstractPythonEnvironmentManager.class.getSimpleName(), LOG);
     }
 
     @Override
@@ -141,68 +138,38 @@ public final class ProcessPythonEnvironmentManager implements PythonEnvironmentM
         } finally {
             if (shutdownHook != null) {
                 ShutdownHookUtil.removeShutdownHook(
-                        shutdownHook, ProcessPythonEnvironmentManager.class.getSimpleName(), LOG);
+                        shutdownHook, AbstractPythonEnvironmentManager.class.getSimpleName(), LOG);
                 shutdownHook = null;
             }
         }
     }
 
-    @Override
-    public PythonEnvironment createEnvironment() throws Exception {
-        HashMap<String, String> env = new HashMap<>(resource.env);
-
-        String runnerScript =
-                PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(
-                        dependencyInfo.getPythonExec(), env);
-
-        return new ProcessPythonEnvironment(runnerScript, env);
+    @VisibleForTesting
+    public String getBaseDirectory() {
+        return resource.baseDirectory;
     }
 
-    /**
-     * Returns an empty RetrievalToken because no files will be transmit via ArtifactService in
-     * process mode.
-     *
-     * @return The path of empty RetrievalToken.
-     */
-    @Override
-    public String createRetrievalToken() throws IOException {
-        File retrievalToken =
-                new File(
-                        resource.baseDirectory,
-                        "retrieval_token_" + UUID.randomUUID().toString() + ".json");
-        if (retrievalToken.createNewFile()) {
-            final DataOutputStream dos = new DataOutputStream(new FileOutputStream(retrievalToken));
-            dos.writeBytes("{\"manifest\": {}}");
-            dos.flush();
-            dos.close();
-            return retrievalToken.getAbsolutePath();
-        } else {
-            throw new IOException(
-                    "Could not create the RetrievalToken file: "
-                            + retrievalToken.getAbsolutePath());
-        }
+    @VisibleForTesting
+    public Map<String, String> getPythonEnv() {
+        return resource.env;
     }
 
     /**
      * Constructs the environment variables which is used to launch the python UDF worker.
      *
-     * <p>To avoid unnecessary IO, the artifacts will not be transmitted via the ArtifactService of
-     * Beam when running in process mode. Instead, the paths of the artifacts will be passed to the
-     * Python UDF worker directly.
-     *
      * @return The environment variables which contain the paths of the python dependencies.
      */
     @VisibleForTesting
-    Map<String, String> constructEnvironmentVariables(String baseDirectory)
-            throws IOException, IllegalArgumentException {
+    public Map<String, String> constructEnvironmentVariables(String baseDirectory)
+            throws IOException {
         Map<String, String> env = new HashMap<>(this.systemEnv);
 
         constructFilesDirectory(env, baseDirectory);
 
-        constructArchivesDirectory(env, baseDirectory);
-
         constructRequirementsDirectory(env, baseDirectory);
 
+        constructArchivesDirectory(env, baseDirectory);
+
         // set BOOT_LOG_DIR.
         env.put("BOOT_LOG_DIR", baseDirectory);
 
@@ -220,8 +187,26 @@ public final class ProcessPythonEnvironmentManager implements PythonEnvironmentM
         return env;
     }
 
-    @VisibleForTesting
-    void installRequirements(String baseDirectory, Map<String, String> env) throws IOException {
+    private static String createBaseDirectory(String[] tmpDirectories) throws IOException {
+        Random rnd = new Random();
+        // try to find a unique file name for the base directory
+        int maxAttempts = 10;
+        for (int attempt = 0; attempt < maxAttempts; attempt++) {
+            String directory = tmpDirectories[rnd.nextInt(tmpDirectories.length)];
+            File baseDirectory = new File(directory, "python-dist-" + UUID.randomUUID().toString());
+            if (baseDirectory.mkdirs()) {
+                return baseDirectory.getAbsolutePath();
+            }
+        }
+
+        throw new IOException(
+                "Could not find a unique directory name in '"
+                        + Arrays.toString(tmpDirectories)
+                        + "' for storing the generated files of python dependency.");
+    }
+
+    private void installRequirements(String baseDirectory, Map<String, String> env)
+            throws IOException {
         // Directory for storing the installation result of the requirements file.
         String requirementsDirectory =
                 String.join(File.separator, baseDirectory, PYTHON_REQUIREMENTS_DIR);
@@ -236,10 +221,6 @@ public final class ProcessPythonEnvironmentManager implements PythonEnvironmentM
         }
     }
 
-    public void setEnvironmentVariable(String key, String value) {
-        this.systemEnv.put(key, value);
-    }
-
     private void constructFilesDirectory(Map<String, String> env, String baseDirectory)
             throws IOException {
         // link or copy python files to filesDirectory and add them to PYTHONPATH
@@ -316,44 +297,8 @@ public final class ProcessPythonEnvironmentManager implements PythonEnvironmentM
         LOG.info("PYTHONPATH of python worker: {}", env.get("PYTHONPATH"));
     }
 
-    private void constructArchivesDirectory(Map<String, String> env, String baseDirectory)
-            throws IOException {
-        // Directory for storing the extracted result of the archive files.
-        String archivesDirectory = String.join(File.separator, baseDirectory, PYTHON_ARCHIVES_DIR);
-
-        if (!dependencyInfo.getArchives().isEmpty()) {
-            // set the archives directory as the working directory, then user could access the
-            // content of the archives via relative path
-            env.put(PYTHON_WORKING_DIR, archivesDirectory);
-            LOG.info("Python working dir of python worker: {}", archivesDirectory);
-
-            // extract archives to archives directory
-            for (Map.Entry<String, String> entry : dependencyInfo.getArchives().entrySet()) {
-                String srcFilePath = entry.getKey();
-
-                String originalFileName;
-                String targetDirName;
-                if (entry.getValue().contains(PARAM_DELIMITER)) {
-                    String[] filePathAndTargetDir = entry.getValue().split(PARAM_DELIMITER, 2);
-                    originalFileName = filePathAndTargetDir[0];
-                    targetDirName = filePathAndTargetDir[1];
-                } else {
-                    originalFileName = entry.getValue();
-                    targetDirName = originalFileName;
-                }
-
-                String targetDirPath =
-                        String.join(File.separator, archivesDirectory, targetDirName);
-                CompressionUtils.extractFile(srcFilePath, targetDirPath, originalFileName);
-            }
-        }
-    }
-
     private void constructRequirementsDirectory(Map<String, String> env, String baseDirectory)
             throws IOException {
-        // set the requirements file and the dependencies specified by the requirements file will be
-        // installed in
-        // boot.py during initialization
         String requirementsDirectory =
                 String.join(File.separator, baseDirectory, PYTHON_REQUIREMENTS_DIR);
         if (dependencyInfo.getRequirementsFilePath().isPresent()) {
@@ -377,34 +322,42 @@ public final class ProcessPythonEnvironmentManager implements PythonEnvironmentM
                         dependencyInfo.getRequirementsCacheDir().get());
             }
 
-            // the dependencies specified by the requirements file will be installed into this
-            // directory, and will be
-            // added to PYTHONPATH in boot.py
             env.put(PYTHON_REQUIREMENTS_INSTALL_DIR, requirementsDirectory);
             LOG.info("Requirements install directory of python worker: {}", requirementsDirectory);
         }
     }
 
-    @VisibleForTesting
-    String getBaseDirectory() {
-        return resource.baseDirectory;
-    }
+    private void constructArchivesDirectory(Map<String, String> env, String baseDirectory)
+            throws IOException {
+        // Directory for storing the extracted result of the archive files.
+        String archivesDirectory = String.join(File.separator, baseDirectory, PYTHON_ARCHIVES_DIR);
 
-    @VisibleForTesting
-    Map<String, String> getPythonEnv() {
-        return resource.env;
-    }
+        if (!dependencyInfo.getArchives().isEmpty()) {
+            // set the archives directory as the working directory, then user could access the
+            // content of the archives via relative path
+            env.put(PYTHON_WORKING_DIR, archivesDirectory);
+            LOG.info("Python working dir of python worker: {}", archivesDirectory);
 
-    @Override
-    public String getBootLog() throws Exception {
-        File bootLogFile =
-                new File(resource.baseDirectory + File.separator + "flink-python-udf-boot.log");
-        String msg = "Failed to create stage bundle factory!";
-        if (bootLogFile.exists()) {
-            byte[] output = Files.readAllBytes(bootLogFile.toPath());
-            msg += String.format(" %s", new String(output, Charset.defaultCharset()));
+            // extract archives to archives directory
+            for (Map.Entry<String, String> entry : dependencyInfo.getArchives().entrySet()) {
+                String srcFilePath = entry.getKey();
+
+                String originalFileName;
+                String targetDirName;
+                if (entry.getValue().contains(PARAM_DELIMITER)) {
+                    String[] filePathAndTargetDir = entry.getValue().split(PARAM_DELIMITER, 2);
+                    originalFileName = filePathAndTargetDir[0];
+                    targetDirName = filePathAndTargetDir[1];
+                } else {
+                    originalFileName = entry.getValue();
+                    targetDirName = originalFileName;
+                }
+
+                String targetDirPath =
+                        String.join(File.separator, archivesDirectory, targetDirName);
+                CompressionUtils.extractFile(srcFilePath, targetDirPath, originalFileName);
+            }
         }
-        return msg;
     }
 
     private static void appendToPythonPath(
@@ -424,24 +377,6 @@ public final class ProcessPythonEnvironmentManager implements PythonEnvironmentM
         }
     }
 
-    private static String createBaseDirectory(String[] tmpDirectories) throws IOException {
-        Random rnd = new Random();
-        // try to find a unique file name for the base directory
-        int maxAttempts = 10;
-        for (int attempt = 0; attempt < maxAttempts; attempt++) {
-            String directory = tmpDirectories[rnd.nextInt(tmpDirectories.length)];
-            File baseDirectory = new File(directory, "python-dist-" + UUID.randomUUID().toString());
-            if (baseDirectory.mkdirs()) {
-                return baseDirectory.getAbsolutePath();
-            }
-        }
-
-        throw new IOException(
-                "Could not find a unique directory name in '"
-                        + Arrays.toString(tmpDirectories)
-                        + "' for storing the generated files of python dependency.");
-    }
-
     private static final class PythonEnvResources {
 
         private static final ReentrantLock lock = new ReentrantLock();
@@ -502,53 +437,57 @@ public final class ProcessPythonEnvironmentManager implements PythonEnvironmentM
             Map<String, String> env = resource.f1;
             return new PythonLeasedResource(baseDirectory, env);
         }
+    }
 
-        private static final class PythonLeasedResource implements AutoCloseable {
-            private final Map<String, String> env;
+    /**
+     * Python lease resource which includes environment variables and working directory of execution
+     * python environment.
+     */
+    public static final class PythonLeasedResource implements AutoCloseable {
+        public final Map<String, String> env;
 
-            /** The base directory of the Python Environment. */
-            private final String baseDirectory;
+        /** The base directory of the Python Environment. */
+        public final String baseDirectory;
 
-            /** Keep track of the number of threads sharing this Python environment resources. */
-            private long refCount = 0;
+        /** Keep track of the number of threads sharing this Python environment resources. */
+        private long refCount = 0;
 
-            PythonLeasedResource(String baseDirectory, Map<String, String> env) {
-                this.baseDirectory = baseDirectory;
-                this.env = env;
-            }
+        PythonLeasedResource(String baseDirectory, Map<String, String> env) {
+            this.baseDirectory = baseDirectory;
+            this.env = env;
+        }
 
-            void incRef() {
-                this.refCount += 1;
-            }
+        void incRef() {
+            this.refCount += 1;
+        }
 
-            void decRef() {
-                Preconditions.checkState(refCount > 0);
-                this.refCount -= 1;
-            }
+        void decRef() {
+            Preconditions.checkState(refCount > 0);
+            this.refCount -= 1;
+        }
 
-            @Override
-            public void close() throws Exception {
-                int retries = 0;
-                while (true) {
-                    try {
-                        FileUtils.deleteDirectory(new File(baseDirectory));
+        @Override
+        public void close() throws Exception {
+            int retries = 0;
+            while (true) {
+                try {
+                    FileUtils.deleteDirectory(new File(baseDirectory));
+                    break;
+                } catch (Throwable t) {
+                    retries++;
+                    if (retries <= CHECK_TIMEOUT / CHECK_INTERVAL) {
+                        LOG.warn(
+                                String.format(
+                                        "Failed to delete the working directory %s of the Python UDF worker. Retrying...",
+                                        baseDirectory),
+                                t);
+                    } else {
+                        LOG.warn(
+                                String.format(
+                                        "Failed to delete the working directory %s of the Python UDF worker.",
+                                        baseDirectory),
+                                t);
                         break;
-                    } catch (Throwable t) {
-                        retries++;
-                        if (retries <= CHECK_TIMEOUT / CHECK_INTERVAL) {
-                            LOG.warn(
-                                    String.format(
-                                            "Failed to delete the working directory %s of the Python UDF worker. Retrying...",
-                                            baseDirectory),
-                                    t);
-                        } else {
-                            LOG.warn(
-                                    String.format(
-                                            "Failed to delete the working directory %s of the Python UDF worker.",
-                                            baseDirectory),
-                                    t);
-                            break;
-                        }
                     }
                 }
             }
diff --git a/flink-python/src/main/java/org/apache/flink/python/env/PythonDependencyInfo.java b/flink-python/src/main/java/org/apache/flink/python/env/PythonDependencyInfo.java
index b44ae2c..4ef6852 100644
--- a/flink-python/src/main/java/org/apache/flink/python/env/PythonDependencyInfo.java
+++ b/flink-python/src/main/java/org/apache/flink/python/env/PythonDependencyInfo.java
@@ -21,6 +21,7 @@ package org.apache.flink.python.env;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.python.PythonConfig;
+import org.apache.flink.python.PythonOptions;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -69,17 +70,37 @@ public final class PythonDependencyInfo {
      */
     @Nonnull private final String pythonExec;
 
+    /** Execution Mode. */
+    @Nonnull private final String executionMode;
+
     public PythonDependencyInfo(
             @Nonnull Map<String, String> pythonFiles,
             @Nullable String requirementsFilePath,
             @Nullable String requirementsCacheDir,
             @Nonnull Map<String, String> archives,
             @Nonnull String pythonExec) {
+        this(
+                pythonFiles,
+                requirementsFilePath,
+                requirementsCacheDir,
+                archives,
+                pythonExec,
+                PythonOptions.PYTHON_EXECUTION_MODE.defaultValue());
+    }
+
+    public PythonDependencyInfo(
+            @Nonnull Map<String, String> pythonFiles,
+            @Nullable String requirementsFilePath,
+            @Nullable String requirementsCacheDir,
+            @Nonnull Map<String, String> archives,
+            @Nonnull String pythonExec,
+            @Nonnull String executionMode) {
         this.pythonFiles = Objects.requireNonNull(pythonFiles);
         this.requirementsFilePath = requirementsFilePath;
         this.requirementsCacheDir = requirementsCacheDir;
         this.pythonExec = Objects.requireNonNull(pythonExec);
         this.archives = Objects.requireNonNull(archives);
+        this.executionMode = Objects.requireNonNull(executionMode);
     }
 
     public Map<String, String> getPythonFiles() {
@@ -102,6 +123,10 @@ public final class PythonDependencyInfo {
         return archives;
     }
 
+    public String getExecutionMode() {
+        return executionMode;
+    }
+
     /**
      * Creates PythonDependencyInfo from GlobalJobParameters and DistributedCache.
      *
@@ -144,6 +169,11 @@ public final class PythonDependencyInfo {
         String pythonExec = pythonConfig.getPythonExec();
 
         return new PythonDependencyInfo(
-                pythonFiles, requirementsFilePath, requirementsCacheDir, archives, pythonExec);
+                pythonFiles,
+                requirementsFilePath,
+                requirementsCacheDir,
+                archives,
+                pythonExec,
+                pythonConfig.getExecutionMode());
     }
 }
diff --git a/flink-python/src/main/java/org/apache/flink/python/env/PythonEnvironmentManager.java b/flink-python/src/main/java/org/apache/flink/python/env/PythonEnvironmentManager.java
index ddd8560..b317edb 100644
--- a/flink-python/src/main/java/org/apache/flink/python/env/PythonEnvironmentManager.java
+++ b/flink-python/src/main/java/org/apache/flink/python/env/PythonEnvironmentManager.java
@@ -19,11 +19,10 @@
 package org.apache.flink.python.env;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.python.PythonFunctionRunner;
 
 /**
  * The base interface of python environment manager which is used to create the PythonEnvironment
- * object and the RetrievalToken.
+ * object used to execute Python functions.
  */
 @Internal
 public interface PythonEnvironmentManager extends AutoCloseable {
@@ -32,21 +31,10 @@ public interface PythonEnvironmentManager extends AutoCloseable {
     void open() throws Exception;
 
     /**
-     * Creates the PythonEnvironment object used in {@link PythonFunctionRunner}.
+     * Creates the PythonEnvironment object used to execute Python functions.
      *
-     * @return The PythonEnvironment object which represents the environment(process, docker, etc)
-     *     the python worker would run in.
+     * @return The PythonEnvironment object which represents the environment(embedded thread,
+     *     process, docker, etc) the python worker would run in.
      */
     PythonEnvironment createEnvironment() throws Exception;
-
-    /**
-     * Creates the RetrievalToken used in {@link PythonFunctionRunner}. It contains a list of files
-     * which need to transmit through ArtifactService provided by {@link PythonFunctionRunner}.
-     *
-     * @return The path of the RetrievalToken file.
-     */
-    String createRetrievalToken() throws Exception;
-
-    /** Returns the boot log of the Python Environment. */
-    String getBootLog() throws Exception;
 }
diff --git a/flink-python/src/main/java/org/apache/flink/python/env/ProcessPythonEnvironment.java b/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironment.java
similarity index 65%
copy from flink-python/src/main/java/org/apache/flink/python/env/ProcessPythonEnvironment.java
copy to flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironment.java
index 4aa8099..bfb020a 100644
--- a/flink-python/src/main/java/org/apache/flink/python/env/ProcessPythonEnvironment.java
+++ b/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironment.java
@@ -16,28 +16,31 @@
  * limitations under the License.
  */
 
-package org.apache.flink.python.env;
+package org.apache.flink.python.env.embedded;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.python.env.PythonEnvironment;
+
+import pemja.core.PythonInterpreterConfig;
 
 import java.util.Map;
 
-/** A {@link PythonEnvironment} for executing UDFs in Process. */
+/** A {@link PythonEnvironment} for executing UDFs in embedded environment. */
 @Internal
-public class ProcessPythonEnvironment implements PythonEnvironment {
-    private final String command;
+public class EmbeddedPythonEnvironment implements PythonEnvironment {
+    private final PythonInterpreterConfig config;
     private final Map<String, String> env;
 
-    public ProcessPythonEnvironment(String command, Map<String, String> env) {
-        this.command = command;
+    public EmbeddedPythonEnvironment(PythonInterpreterConfig config, Map<String, String> env) {
+        this.config = config;
         this.env = env;
     }
 
-    public Map<String, String> getEnv() {
-        return env;
+    public PythonInterpreterConfig getConfig() {
+        return config;
     }
 
-    public String getCommand() {
-        return command;
+    public Map<String, String> getEnv() {
+        return env;
     }
 }
diff --git a/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironmentManager.java b/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironmentManager.java
new file mode 100644
index 0000000..add296a
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironmentManager.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.env.embedded;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.python.env.AbstractPythonEnvironmentManager;
+import org.apache.flink.python.env.PythonDependencyInfo;
+import org.apache.flink.python.env.PythonEnvironment;
+
+import pemja.core.PythonInterpreterConfig;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The base class of python environment manager which is used to create the PythonEnvironment
+ * object. It's used to run python UDF in embedded Python environment.
+ */
+@Internal
+public class EmbeddedPythonEnvironmentManager extends AbstractPythonEnvironmentManager {
+
+    public EmbeddedPythonEnvironmentManager(
+            PythonDependencyInfo dependencyInfo,
+            String[] tmpDirectories,
+            Map<String, String> systemEnv,
+            JobID jobID) {
+        super(dependencyInfo, tmpDirectories, systemEnv, jobID);
+    }
+
+    @Override
+    public PythonEnvironment createEnvironment() throws Exception {
+        Map<String, String> env = new HashMap<>(getPythonEnv());
+
+        PythonInterpreterConfig.ExecType execType;
+
+        String executionMode = dependencyInfo.getExecutionMode();
+
+        if (executionMode.equalsIgnoreCase("sub-interpreter")) {
+            execType = PythonInterpreterConfig.ExecType.SUB_INTERPRETER;
+        } else if (executionMode.equalsIgnoreCase("multi-thread")) {
+            execType = PythonInterpreterConfig.ExecType.MULTI_THREAD;
+        } else {
+            throw new RuntimeException(
+                    String.format("Unsupported execution mode %s.", executionMode));
+        }
+
+        if (env.containsKey("FLINK_TESTING")) {
+            String flinkHome = env.get("FLINK_HOME");
+            String sourceRootDir = new File(flinkHome, "../../../../").getCanonicalPath();
+            String flinkPython = sourceRootDir + "/flink-python";
+            // add flink-python of source code to PYTHONPATH
+            env.put(
+                    "PYTHONPATH",
+                    flinkPython + File.pathSeparator + env.getOrDefault("PYTHONPATH", ""));
+        }
+
+        PythonInterpreterConfig interpreterConfig =
+                PythonInterpreterConfig.newBuilder()
+                        .setPythonExec(dependencyInfo.getPythonExec())
+                        .setExcType(execType)
+                        .addPythonPaths(env.getOrDefault("PYTHONPATH", ""))
+                        .build();
+
+        return new EmbeddedPythonEnvironment(interpreterConfig, env);
+    }
+}
diff --git a/flink-python/src/main/java/org/apache/flink/python/env/ProcessPythonEnvironment.java b/flink-python/src/main/java/org/apache/flink/python/env/process/ProcessPythonEnvironment.java
similarity index 93%
rename from flink-python/src/main/java/org/apache/flink/python/env/ProcessPythonEnvironment.java
rename to flink-python/src/main/java/org/apache/flink/python/env/process/ProcessPythonEnvironment.java
index 4aa8099..86c13f9 100644
--- a/flink-python/src/main/java/org/apache/flink/python/env/ProcessPythonEnvironment.java
+++ b/flink-python/src/main/java/org/apache/flink/python/env/process/ProcessPythonEnvironment.java
@@ -16,9 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.flink.python.env;
+package org.apache.flink.python.env.process;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.python.env.PythonEnvironment;
 
 import java.util.Map;
 
diff --git a/flink-python/src/main/java/org/apache/flink/python/env/process/ProcessPythonEnvironmentManager.java b/flink-python/src/main/java/org/apache/flink/python/env/process/ProcessPythonEnvironmentManager.java
new file mode 100644
index 0000000..b316e06
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/python/env/process/ProcessPythonEnvironmentManager.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.env.process;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.python.env.AbstractPythonEnvironmentManager;
+import org.apache.flink.python.env.PythonDependencyInfo;
+import org.apache.flink.python.env.PythonEnvironment;
+import org.apache.flink.python.util.PythonEnvironmentManagerUtils;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * The ProcessPythonEnvironmentManager is used to prepare the working dir of python UDF worker and
+ * create ProcessPythonEnvironment object of Beam Fn API. It's used when the python function runner
+ * is configured to run python UDF in process mode.
+ */
+@Internal
+public final class ProcessPythonEnvironmentManager extends AbstractPythonEnvironmentManager {
+
+    public ProcessPythonEnvironmentManager(
+            PythonDependencyInfo dependencyInfo,
+            String[] tmpDirectories,
+            Map<String, String> systemEnv,
+            JobID jobID) {
+        super(dependencyInfo, tmpDirectories, systemEnv, jobID);
+    }
+
+    @Override
+    public PythonEnvironment createEnvironment() throws Exception {
+        HashMap<String, String> env = new HashMap<>(resource.env);
+
+        String runnerScript =
+                PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(
+                        dependencyInfo.getPythonExec(), env);
+
+        return new ProcessPythonEnvironment(runnerScript, env);
+    }
+
+    /**
+     * Returns an empty RetrievalToken because no files will be transmit via ArtifactService in
+     * process mode.
+     *
+     * @return The path of empty RetrievalToken.
+     */
+    public String createRetrievalToken() throws IOException {
+        File retrievalToken =
+                new File(
+                        resource.baseDirectory,
+                        "retrieval_token_" + UUID.randomUUID().toString() + ".json");
+        if (retrievalToken.createNewFile()) {
+            final DataOutputStream dos = new DataOutputStream(new FileOutputStream(retrievalToken));
+            dos.writeBytes("{\"manifest\": {}}");
+            dos.flush();
+            dos.close();
+            return retrievalToken.getAbsolutePath();
+        } else {
+            throw new IOException(
+                    "Could not create the RetrievalToken file: "
+                            + retrievalToken.getAbsolutePath());
+        }
+    }
+
+    /** Returns the boot log of the Python Environment. */
+    public String getBootLog() throws Exception {
+        File bootLogFile =
+                new File(resource.baseDirectory + File.separator + "flink-python-udf-boot.log");
+        String msg = "Failed to create stage bundle factory!";
+        if (bootLogFile.exists()) {
+            byte[] output = Files.readAllBytes(bootLogFile.toPath());
+            msg += String.format(" %s", new String(output, Charset.defaultCharset()));
+        }
+        return msg;
+    }
+}
diff --git a/flink-python/src/main/java/org/apache/flink/python/util/PythonEnvironmentManagerUtils.java b/flink-python/src/main/java/org/apache/flink/python/util/PythonEnvironmentManagerUtils.java
index 9da7140..54845eb 100644
--- a/flink-python/src/main/java/org/apache/flink/python/util/PythonEnvironmentManagerUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/python/util/PythonEnvironmentManagerUtils.java
@@ -37,7 +37,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.PYTHON_WORKING_DIR;
+import static org.apache.flink.python.env.process.ProcessPythonEnvironmentManager.PYTHON_WORKING_DIR;
 
 /** Utils used to prepare the python environment. */
 @Internal
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractDataStreamPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractDataStreamPythonFunctionOperator.java
index b8feff9..6685abb 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractDataStreamPythonFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractDataStreamPythonFunctionOperator.java
@@ -34,7 +34,7 @@ import java.util.Map;
 /** Base class for all Python DataStream operators. */
 @Internal
 public abstract class AbstractDataStreamPythonFunctionOperator<OUT>
-        extends AbstractPythonFunctionOperator<OUT> implements ResultTypeQueryable<OUT> {
+        extends AbstractExternalPythonFunctionOperator<OUT> implements ResultTypeQueryable<OUT> {
 
     private static final long serialVersionUID = 1L;
 
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java
new file mode 100644
index 0000000..f23e3d2
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.python.PythonConfig;
+import org.apache.flink.python.env.PythonDependencyInfo;
+import org.apache.flink.python.env.embedded.EmbeddedPythonEnvironment;
+import org.apache.flink.python.env.embedded.EmbeddedPythonEnvironmentManager;
+import org.apache.flink.table.functions.python.PythonEnv;
+
+import pemja.core.PythonInterpreter;
+import pemja.core.PythonInterpreterConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.python.env.AbstractPythonEnvironmentManager.PYTHON_WORKING_DIR;
+
+/**
+ * Abstract class for all stream operators to execute Python functions in embedded Python
+ * environment.
+ */
+@Internal
+public abstract class AbstractEmbeddedPythonFunctionOperator<OUT>
+        extends AbstractPythonFunctionOperator<OUT> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static ReentrantLock lock = new ReentrantLock();
+
+    private static Map<JobID, Tuple2<String, Integer>> workingDirectories = new HashMap<>();
+
+    /** The python config. */
+    protected transient PythonConfig pythonConfig;
+
+    /** Every operator will hold the only python interpreter. */
+    protected transient PythonInterpreter interpreter;
+
+    private transient EmbeddedPythonEnvironmentManager pythonEnvironmentManager;
+
+    public AbstractEmbeddedPythonFunctionOperator(Configuration config) {
+        super(config);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        pythonConfig = new PythonConfig(config);
+        pythonEnvironmentManager = createPythonEnvironmentManager();
+        pythonEnvironmentManager.open();
+        EmbeddedPythonEnvironment environment =
+                (EmbeddedPythonEnvironment) pythonEnvironmentManager.createEnvironment();
+
+        PythonInterpreterConfig interpreterConfig = environment.getConfig();
+        interpreter = new PythonInterpreter(interpreterConfig);
+        Map<String, String> env = environment.getEnv();
+
+        if (env.containsKey(PYTHON_WORKING_DIR)) {
+            lock.lockInterruptibly();
+
+            try {
+                JobID jobId = getRuntimeContext().getJobId();
+                Tuple2<String, Integer> dirAndNums;
+
+                if (workingDirectories.containsKey(jobId)) {
+                    dirAndNums = workingDirectories.get(jobId);
+                } else {
+                    dirAndNums = Tuple2.of(null, 0);
+                    workingDirectories.put(jobId, dirAndNums);
+                }
+                dirAndNums.f1 += 1;
+
+                if (dirAndNums.f0 == null) {
+                    // get current directory.
+                    interpreter.exec("import os;cwd = os.getcwd();");
+                    dirAndNums.f0 = interpreter.get("cwd", String.class);
+                    String workingDirectory = env.get(PYTHON_WORKING_DIR);
+                    // set working directory
+                    interpreter.exec(String.format("import os;os.chdir('%s')", workingDirectory));
+                }
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        openPythonInterpreter(pythonConfig.getPythonExec(), env, interpreterConfig.getExecType());
+    }
+
+    @Override
+    public void close() throws Exception {
+        try {
+            JobID jobId = getRuntimeContext().getJobId();
+            if (workingDirectories.containsKey(jobId)) {
+                lock.lockInterruptibly();
+
+                try {
+                    Tuple2<String, Integer> dirAndNums = workingDirectories.get(jobId);
+                    dirAndNums.f1 -= 1;
+                    if (dirAndNums.f1 == 0) {
+                        // change to previous working directory.
+                        interpreter.exec(String.format("import os;os.chdir('%s')", dirAndNums.f0));
+                    }
+                } finally {
+                    lock.unlock();
+                }
+            }
+
+            if (interpreter != null) {
+                interpreter.close();
+            }
+
+            if (pythonEnvironmentManager != null) {
+                pythonEnvironmentManager.close();
+            }
+        } finally {
+            super.close();
+        }
+    }
+
+    @Override
+    protected EmbeddedPythonEnvironmentManager createPythonEnvironmentManager() {
+        PythonDependencyInfo dependencyInfo =
+                PythonDependencyInfo.create(
+                        pythonConfig, getRuntimeContext().getDistributedCache());
+        return new EmbeddedPythonEnvironmentManager(
+                dependencyInfo,
+                getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories(),
+                new HashMap<>(System.getenv()),
+                getRuntimeContext().getJobId());
+    }
+
+    /** Setup method for Python Interpreter. It can be used for initialization work. */
+    public abstract void openPythonInterpreter(
+            String pythonExecutable,
+            Map<String, String> env,
+            PythonInterpreterConfig.ExecType execType)
+            throws Exception;
+
+    /** Returns the {@link PythonEnv} used to create PythonEnvironmentManager. */
+    public abstract PythonEnv getPythonEnv();
+
+    /** Gets the proto representation of the Python user-defined functions to be executed. */
+    public abstract FlinkFnApi.UserDefinedFunctions getUserDefinedFunctionsProto();
+}
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractExternalPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractExternalPythonFunctionOperator.java
new file mode 100644
index 0000000..702e42e
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractExternalPythonFunctionOperator.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.env.PythonDependencyInfo;
+import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
+import org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner;
+import org.apache.flink.table.functions.python.PythonEnv;
+
+import java.util.HashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/** Abstract class for all stream operators to execute Python functions in external environment. */
+@Internal
+public abstract class AbstractExternalPythonFunctionOperator<OUT>
+        extends AbstractPythonFunctionOperator<OUT> {
+    /**
+     * The {@link PythonFunctionRunner} which is responsible for Python user-defined function
+     * execution.
+     */
+    protected transient PythonFunctionRunner pythonFunctionRunner;
+
+    private transient ExecutorService flushThreadPool;
+
+    public AbstractExternalPythonFunctionOperator(Configuration config) {
+        super(config);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.pythonFunctionRunner = createPythonFunctionRunner();
+        this.pythonFunctionRunner.open(pythonConfig);
+        this.flushThreadPool = Executors.newSingleThreadExecutor();
+    }
+
+    @Override
+    public void close() throws Exception {
+        try {
+            if (pythonFunctionRunner != null) {
+                pythonFunctionRunner.close();
+                pythonFunctionRunner = null;
+            }
+
+            if (flushThreadPool != null) {
+                flushThreadPool.shutdown();
+                flushThreadPool = null;
+            }
+        } finally {
+            super.close();
+        }
+    }
+
+    @Override
+    protected void invokeFinishBundle() throws Exception {
+        if (elementCount > 0) {
+            AtomicBoolean flushThreadFinish = new AtomicBoolean(false);
+            AtomicReference<Throwable> exceptionReference = new AtomicReference<>();
+            flushThreadPool.submit(
+                    () -> {
+                        try {
+                            pythonFunctionRunner.flush();
+                        } catch (Throwable e) {
+                            exceptionReference.set(e);
+                        } finally {
+                            flushThreadFinish.set(true);
+                            // interrupt the progress of takeResult to avoid the main thread is
+                            // blocked forever.
+                            ((BeamPythonFunctionRunner) pythonFunctionRunner).notifyNoMoreResults();
+                        }
+                    });
+            Tuple2<byte[], Integer> resultTuple;
+            while (!flushThreadFinish.get()) {
+                resultTuple = pythonFunctionRunner.takeResult();
+                if (resultTuple.f1 != 0) {
+                    emitResult(resultTuple);
+                    emitResults();
+                }
+            }
+            emitResults();
+            Throwable flushThreadThrowable = exceptionReference.get();
+            if (flushThreadThrowable != null) {
+                throw new RuntimeException(
+                        "Error while waiting for BeamPythonFunctionRunner flush",
+                        flushThreadThrowable);
+            }
+            elementCount = 0;
+            lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime();
+            // callback only after current bundle was fully finalized
+            if (bundleFinishedCallback != null) {
+                bundleFinishedCallback.run();
+                bundleFinishedCallback = null;
+            }
+        }
+    }
+
+    @Override
+    protected ProcessPythonEnvironmentManager createPythonEnvironmentManager() {
+        PythonDependencyInfo dependencyInfo =
+                PythonDependencyInfo.create(
+                        pythonConfig, getRuntimeContext().getDistributedCache());
+        PythonEnv pythonEnv = getPythonEnv();
+        if (pythonEnv.getExecType() == PythonEnv.ExecType.PROCESS) {
+            return new ProcessPythonEnvironmentManager(
+                    dependencyInfo,
+                    getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories(),
+                    new HashMap<>(System.getenv()),
+                    getRuntimeContext().getJobId());
+        } else {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Execution type '%s' is not supported.", pythonEnv.getExecType()));
+        }
+    }
+
+    protected void emitResults() throws Exception {
+        Tuple2<byte[], Integer> resultTuple;
+        while ((resultTuple = pythonFunctionRunner.pollResult()) != null && resultTuple.f1 != 0) {
+            emitResult(resultTuple);
+        }
+    }
+
+    /** Sends the execution result to the downstream operator. */
+    public abstract void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception;
+
+    /**
+     * Creates the {@link PythonFunctionRunner} which is responsible for Python user-defined
+     * function execution.
+     */
+    public abstract PythonFunctionRunner createPythonFunctionRunner() throws Exception;
+}
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
index d321370..aa1dfe1 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
@@ -19,36 +19,26 @@
 package org.apache.flink.streaming.api.operators.python;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.python.PythonConfig;
-import org.apache.flink.python.PythonFunctionRunner;
 import org.apache.flink.python.PythonOptions;
-import org.apache.flink.python.env.PythonDependencyInfo;
 import org.apache.flink.python.env.PythonEnvironmentManager;
-import org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager;
 import org.apache.flink.python.metric.FlinkMetricContainer;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager;
 import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionKeyedStateBackend;
-import org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.table.functions.python.PythonEnv;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.WrappingRuntimeException;
 
 import java.lang.reflect.Field;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.streaming.api.utils.ClassLeakCleaner.cleanUpLeakingClasses;
 import static org.apache.flink.streaming.api.utils.PythonOperatorUtils.inBatchExecutionMode;
@@ -61,12 +51,6 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream
 
     protected Configuration config;
 
-    /**
-     * The {@link PythonFunctionRunner} which is responsible for Python user-defined function
-     * execution.
-     */
-    protected transient PythonFunctionRunner pythonFunctionRunner;
-
     /** Max number of elements to include in a bundle. */
     protected transient int maxBundleSize;
 
@@ -83,15 +67,13 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream
     private transient long maxBundleTimeMills;
 
     /** Time that the last bundle was finished. */
-    private transient long lastFinishBundleTime;
+    protected transient long lastFinishBundleTime;
 
     /** A timer that finishes the current bundle after a fixed amount of time. */
     private transient ScheduledFuture<?> checkFinishBundleTimer;
 
     /** Callback to be executed after the current bundle was finished. */
-    private transient Runnable bundleFinishedCallback;
-
-    private transient ExecutorService flushThreadPool;
+    protected transient Runnable bundleFinishedCallback;
 
     public AbstractPythonFunctionOperator(Configuration config) {
         this.config = Preconditions.checkNotNull(config);
@@ -127,9 +109,6 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream
                         this.maxBundleTimeMills);
             }
 
-            this.pythonFunctionRunner = createPythonFunctionRunner();
-            this.pythonFunctionRunner.open(pythonConfig);
-
             this.elementCount = 0;
             this.lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime();
 
@@ -143,7 +122,6 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream
                                     timestamp -> checkInvokeFinishBundleByTime(),
                                     bundleCheckPeriod,
                                     bundleCheckPeriod);
-            this.flushThreadPool = Executors.newSingleThreadExecutor();
         } finally {
             super.open();
         }
@@ -165,14 +143,6 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream
                 checkFinishBundleTimer.cancel(true);
                 checkFinishBundleTimer = null;
             }
-            if (pythonFunctionRunner != null) {
-                pythonFunctionRunner.close();
-                pythonFunctionRunner = null;
-            }
-            if (flushThreadPool != null) {
-                flushThreadPool.shutdown();
-                flushThreadPool = null;
-            }
         } finally {
             super.close();
 
@@ -296,24 +266,12 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream
         return config;
     }
 
-    /**
-     * Creates the {@link PythonFunctionRunner} which is responsible for Python user-defined
-     * function execution.
-     */
-    public abstract PythonFunctionRunner createPythonFunctionRunner() throws Exception;
-
     /** Returns the {@link PythonEnv} used to create PythonEnvironmentManager.. */
     public abstract PythonEnv getPythonEnv();
 
-    /** Sends the execution result to the downstream operator. */
-    public abstract void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception;
+    protected abstract void invokeFinishBundle() throws Exception;
 
-    protected void emitResults() throws Exception {
-        Tuple2<byte[], Integer> resultTuple;
-        while ((resultTuple = pythonFunctionRunner.pollResult()) != null && resultTuple.f1 != 0) {
-            emitResult(resultTuple);
-        }
-    }
+    protected abstract PythonEnvironmentManager createPythonEnvironmentManager();
 
     /** Checks whether to invoke finishBundle by elements count. Called in processElement. */
     protected void checkInvokeFinishBundleByCount() throws Exception {
@@ -330,66 +288,6 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream
         }
     }
 
-    protected void invokeFinishBundle() throws Exception {
-        if (elementCount > 0) {
-            AtomicBoolean flushThreadFinish = new AtomicBoolean(false);
-            AtomicReference<Throwable> exceptionReference = new AtomicReference<>();
-            flushThreadPool.submit(
-                    () -> {
-                        try {
-                            pythonFunctionRunner.flush();
-                        } catch (Throwable e) {
-                            exceptionReference.set(e);
-                        } finally {
-                            flushThreadFinish.set(true);
-                            // interrupt the progress of takeResult to avoid the main thread is
-                            // blocked forever.
-                            ((BeamPythonFunctionRunner) pythonFunctionRunner).notifyNoMoreResults();
-                        }
-                    });
-            Tuple2<byte[], Integer> resultTuple;
-            while (!flushThreadFinish.get()) {
-                resultTuple = pythonFunctionRunner.takeResult();
-                if (resultTuple.f1 != 0) {
-                    emitResult(resultTuple);
-                    emitResults();
-                }
-            }
-            emitResults();
-            Throwable flushThreadThrowable = exceptionReference.get();
-            if (flushThreadThrowable != null) {
-                throw new RuntimeException(
-                        "Error while waiting for BeamPythonFunctionRunner flush",
-                        flushThreadThrowable);
-            }
-            elementCount = 0;
-            lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime();
-            // callback only after current bundle was fully finalized
-            if (bundleFinishedCallback != null) {
-                bundleFinishedCallback.run();
-                bundleFinishedCallback = null;
-            }
-        }
-    }
-
-    protected PythonEnvironmentManager createPythonEnvironmentManager() {
-        PythonDependencyInfo dependencyInfo =
-                PythonDependencyInfo.create(
-                        pythonConfig, getRuntimeContext().getDistributedCache());
-        PythonEnv pythonEnv = getPythonEnv();
-        if (pythonEnv.getExecType() == PythonEnv.ExecType.PROCESS) {
-            return new ProcessPythonEnvironmentManager(
-                    dependencyInfo,
-                    getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories(),
-                    new HashMap<>(System.getenv()),
-                    getRuntimeContext().getJobId());
-        } else {
-            throw new UnsupportedOperationException(
-                    String.format(
-                            "Execution type '%s' is not supported.", pythonEnv.getExecType()));
-        }
-    }
-
     protected FlinkMetricContainer getFlinkMetricContainer() {
         return this.pythonConfig.isMetricEnabled()
                 ? new FlinkMetricContainer(getRuntimeContext().getMetricGroup())
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java
index 7347498..a76871f 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.api.runners.python.beam;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
-import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
 import org.apache.flink.python.metric.FlinkMetricContainer;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.state.KeyedStateBackend;
@@ -68,7 +68,7 @@ public class BeamDataStreamPythonFunctionRunner extends BeamPythonFunctionRunner
 
     public BeamDataStreamPythonFunctionRunner(
             String taskName,
-            PythonEnvironmentManager environmentManager,
+            ProcessPythonEnvironmentManager environmentManager,
             String headOperatorFunctionUrn,
             List<FlinkFnApi.UserDefinedDataStreamFunction> userDefinedDataStreamFunctions,
             Map<String, String> jobOptions,
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
index 0498ea9..86264ab 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
@@ -26,9 +26,9 @@ import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.python.PythonConfig;
 import org.apache.flink.python.PythonFunctionRunner;
 import org.apache.flink.python.PythonOptions;
-import org.apache.flink.python.env.ProcessPythonEnvironment;
 import org.apache.flink.python.env.PythonEnvironment;
-import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.env.process.ProcessPythonEnvironment;
+import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
 import org.apache.flink.python.metric.FlinkMetricContainer;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.memory.OpaqueMemoryResource;
@@ -109,8 +109,8 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner {
 
     private final String taskName;
 
-    /** The Python execution environment manager. */
-    private final PythonEnvironmentManager environmentManager;
+    /** The Python process execution environment manager. */
+    private final ProcessPythonEnvironmentManager environmentManager;
 
     /** The options used to configure the Python worker process. */
     private final Map<String, String> jobOptions;
@@ -177,7 +177,7 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner {
 
     public BeamPythonFunctionRunner(
             String taskName,
-            PythonEnvironmentManager environmentManager,
+            ProcessPythonEnvironmentManager environmentManager,
             Map<String, String> jobOptions,
             @Nullable FlinkMetricContainer flinkMetricContainer,
             @Nullable KeyedStateBackend keyedStateBackend,
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractOneInputPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractOneInputPythonFunctionOperator.java
index 503bc09..92dbbd5 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractOneInputPythonFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractOneInputPythonFunctionOperator.java
@@ -22,12 +22,12 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator;
+import org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator;
 
 /** Base class for all one input stream operators to execute Python functions. */
 @Internal
 public abstract class AbstractOneInputPythonFunctionOperator<IN, OUT>
-        extends AbstractPythonFunctionOperator<OUT>
+        extends AbstractExternalPythonFunctionOperator<OUT>
         implements OneInputStreamOperator<IN, OUT>, BoundedOneInput {
 
     private static final long serialVersionUID = 1L;
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java
new file mode 100644
index 0000000..848b678
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.python.AbstractEmbeddedPythonFunctionOperator;
+import org.apache.flink.streaming.api.utils.ProtoUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.generated.Projection;
+import org.apache.flink.table.runtime.operators.python.utils.StreamRecordRowDataWrappingCollector;
+import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import pemja.core.PythonInterpreterConfig;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** The Python {@link ScalarFunction} operator in embedded Python environment. */
+@Internal
+public class EmbeddedPythonScalarFunctionOperator
+        extends AbstractEmbeddedPythonFunctionOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData>, BoundedOneInput {
+
+    private static final long serialVersionUID = 1L;
+
+    /** The Python {@link ScalarFunction}s to be executed. */
+    private final PythonFunctionInfo[] scalarFunctions;
+
+    /** The offsets of user-defined function inputs. */
+    private final int[] udfInputOffsets;
+
+    /** The input logical type. */
+    protected final RowType inputType;
+
+    /** The user-defined function input logical type. */
+    protected final RowType udfInputType;
+
+    /** The user-defined function output logical type. */
+    protected final RowType udfOutputType;
+
+    private GeneratedProjection forwardedFieldGeneratedProjection;
+
+    /** The GenericRowData reused holding the execution result of python udf. */
+    private GenericRowData reuseResultRowData;
+
+    /** The collector used to collect records. */
+    private transient StreamRecordRowDataWrappingCollector rowDataWrapper;
+
+    /** The Projection which projects the forwarded fields from the input row. */
+    private transient Projection<RowData, BinaryRowData> forwardedFieldProjection;
+
+    private transient PythonTypeUtils.DataConverter[] userDefinedFunctionInputConverters;
+    private transient Object[] userDefinedFunctionInputArgs;
+    private transient PythonTypeUtils.DataConverter[] userDefinedFunctionOutputConverters;
+
+    /** Whether there is only one input argument. */
+    private transient boolean isOneArg;
+
+    /** Whether is only one field of udf result. */
+    private transient boolean isOneFieldResult;
+
+    public EmbeddedPythonScalarFunctionOperator(
+            Configuration config,
+            PythonFunctionInfo[] scalarFunctions,
+            RowType inputType,
+            RowType udfInputType,
+            RowType udfOutputType,
+            int[] udfInputOffsets) {
+        super(config);
+        this.inputType = Preconditions.checkNotNull(inputType);
+        this.udfInputType = Preconditions.checkNotNull(udfInputType);
+        this.udfOutputType = Preconditions.checkNotNull(udfOutputType);
+        this.udfInputOffsets = Preconditions.checkNotNull(udfInputOffsets);
+        this.scalarFunctions = Preconditions.checkNotNull(scalarFunctions);
+    }
+
+    public EmbeddedPythonScalarFunctionOperator(
+            Configuration config,
+            PythonFunctionInfo[] scalarFunctions,
+            RowType inputType,
+            RowType udfInputType,
+            RowType udfOutputType,
+            int[] udfInputOffsets,
+            GeneratedProjection forwardedFieldGeneratedProjection) {
+        this(config, scalarFunctions, inputType, udfInputType, udfOutputType, udfInputOffsets);
+        this.forwardedFieldGeneratedProjection =
+                Preconditions.checkNotNull(forwardedFieldGeneratedProjection);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void open() throws Exception {
+        isOneArg = udfInputOffsets.length == 1;
+        isOneFieldResult = udfOutputType.getFieldCount() == 1;
+        super.open();
+        rowDataWrapper = new StreamRecordRowDataWrappingCollector(output);
+        reuseResultRowData = new GenericRowData(udfOutputType.getFieldCount());
+        RowType userDefinedFunctionInputType =
+                new RowType(
+                        Arrays.stream(udfInputOffsets)
+                                .mapToObj(i -> inputType.getFields().get(i))
+                                .collect(Collectors.toList()));
+        userDefinedFunctionInputConverters =
+                userDefinedFunctionInputType.getFields().stream()
+                        .map(RowType.RowField::getType)
+                        .map(PythonTypeUtils::toDataConverter)
+                        .toArray(PythonTypeUtils.DataConverter[]::new);
+        userDefinedFunctionInputArgs = new Object[udfInputOffsets.length];
+        userDefinedFunctionOutputConverters =
+                udfOutputType.getFields().stream()
+                        .map(RowType.RowField::getType)
+                        .map(PythonTypeUtils::toDataConverter)
+                        .toArray(PythonTypeUtils.DataConverter[]::new);
+
+        if (forwardedFieldGeneratedProjection != null) {
+            forwardedFieldProjection =
+                    forwardedFieldGeneratedProjection.newInstance(
+                            Thread.currentThread().getContextClassLoader());
+        }
+    }
+
+    @Override
+    public void openPythonInterpreter(
+            String pythonExecutable,
+            Map<String, String> env,
+            PythonInterpreterConfig.ExecType execType)
+            throws Exception {
+        if (execType.equals(PythonInterpreterConfig.ExecType.SUB_INTERPRETER)) {
+            LOG.info("Create Operation in sub-interpreters.");
+            String[] commands =
+                    new String[] {
+                        pythonExecutable,
+                        "-c",
+                        String.format(
+                                "from pyflink.fn_execution.utils.operation_utils import create_serialized_scalar_operation_from_proto;"
+                                        + "print(create_serialized_scalar_operation_from_proto(%s, %s, %s))",
+                                Arrays.toString(getUserDefinedFunctionsProto().toByteArray()),
+                                isOneArg ? "True" : "False",
+                                isOneFieldResult ? "True" : "False")
+                    };
+            interpreter.exec(
+                    "from pyflink.fn_execution.utils.operation_utils import deserialized_operation_from_serialized_bytes");
+            interpreter.exec(
+                    String.format(
+                            "scalar_operation = deserialized_operation_from_serialized_bytes(%s)",
+                            executeScript(commands, env)));
+        } else {
+            LOG.info("Create Operation in multi-threads.");
+
+            // The CPython extension included in proto does not support initialization
+            // multiple times, so we choose the only interpreter process to be responsible for
+            // initialization and proto parsing. The only interpreter parses the proto and
+            // serializes function operations with cloudpickle.
+            interpreter.exec(
+                    "from pyflink.fn_execution.utils.operation_utils import create_scalar_operation_from_proto");
+            interpreter.set("proto", getUserDefinedFunctionsProto().toByteArray());
+
+            interpreter.exec(
+                    String.format(
+                            "scalar_operation = create_scalar_operation_from_proto(proto, %s, %s)",
+                            isOneArg ? "True" : "False", isOneFieldResult ? "True" : "False"));
+        }
+
+        // invoke `open` method of ScalarOperation.
+        interpreter.invokeMethod("scalar_operation", "open");
+    }
+
+    @Override
+    public void endInput() {
+        if (interpreter != null) {
+            // invoke `close` method of ScalarOperation.
+            interpreter.invokeMethod("scalar_operation", "close");
+        }
+    }
+
+    @Override
+    public PythonEnv getPythonEnv() {
+        return scalarFunctions[0].getPythonFunction().getPythonEnv();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void processElement(StreamRecord<RowData> element) {
+        RowData value = element.getValue();
+
+        Object udfArgs = null;
+        if (userDefinedFunctionInputArgs.length > 1) {
+            for (int i = 0; i < userDefinedFunctionInputArgs.length; i++) {
+                userDefinedFunctionInputArgs[i] =
+                        userDefinedFunctionInputConverters[i].toExternal(value, udfInputOffsets[i]);
+            }
+            udfArgs = userDefinedFunctionInputArgs;
+        } else if (userDefinedFunctionInputArgs.length == 1) {
+            udfArgs = userDefinedFunctionInputConverters[0].toExternal(value, udfInputOffsets[0]);
+        }
+
+        if (isOneFieldResult) {
+            Object udfResult =
+                    interpreter.invokeMethod("scalar_operation", "process_element", udfArgs);
+            reuseResultRowData.setField(
+                    0, userDefinedFunctionOutputConverters[0].toInternal(udfResult));
+        } else {
+            Object[] udfResult =
+                    (Object[])
+                            interpreter.invokeMethod(
+                                    "scalar_operation", "process_element", udfArgs);
+            for (int i = 0; i < udfResult.length; i++) {
+                reuseResultRowData.setField(
+                        i, userDefinedFunctionOutputConverters[i].toInternal(udfResult[i]));
+            }
+        }
+
+        if (forwardedFieldProjection != null) {
+            BinaryRowData forwardedRowData = forwardedFieldProjection.apply(value).copy();
+            JoinedRowData reuseJoinedRow = new JoinedRowData(forwardedRowData, reuseResultRowData);
+            rowDataWrapper.collect(reuseJoinedRow);
+        } else {
+            rowDataWrapper.collect(reuseResultRowData);
+        }
+    }
+
+    @Override
+    protected void invokeFinishBundle() throws Exception {
+        // TODO: Support batches invoking.
+    }
+
+    @Override
+    public FlinkFnApi.UserDefinedFunctions getUserDefinedFunctionsProto() {
+        FlinkFnApi.UserDefinedFunctions.Builder builder =
+                FlinkFnApi.UserDefinedFunctions.newBuilder();
+        // add udf proto
+        for (PythonFunctionInfo pythonFunctionInfo : scalarFunctions) {
+            builder.addUdfs(ProtoUtils.getUserDefinedFunctionProto(pythonFunctionInfo));
+        }
+        builder.setMetricEnabled(pythonConfig.isMetricEnabled());
+        builder.setProfileEnabled(pythonConfig.isProfileEnabled());
+        return builder.build();
+    }
+
+    private String executeScript(final String[] commands, Map<String, String> env)
+            throws IOException {
+        ProcessBuilder pb = new ProcessBuilder(commands);
+        pb.environment().putAll(env);
+        pb.redirectErrorStream(true);
+        Process p = pb.start();
+        InputStream in = new BufferedInputStream(p.getInputStream());
+        StringBuilder out = new StringBuilder();
+        String s;
+        try (BufferedReader br = new BufferedReader(new InputStreamReader(in))) {
+            while ((s = br.readLine()) != null) {
+                out.append(s).append("\n");
+            }
+        }
+        try {
+            if (p.waitFor() != 0) {
+                throw new IOException(
+                        String.format(
+                                "Failed to execute the command: %s\noutput: %s",
+                                String.join(" ", commands), out));
+            }
+        } catch (InterruptedException e) {
+            // Ignored. The subprocess is dead after "br.readLine()" returns null, so the call of
+            // "waitFor" should return intermediately.
+        }
+        return out.toString();
+    }
+}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java
index c0d86d6..d502a47 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.runtime.runners.python.beam;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
-import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
 import org.apache.flink.python.metric.FlinkMetricContainer;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.state.KeyedStateBackend;
@@ -54,7 +54,7 @@ public class BeamTablePythonFunctionRunner extends BeamPythonFunctionRunner {
 
     public BeamTablePythonFunctionRunner(
             String taskName,
-            PythonEnvironmentManager environmentManager,
+            ProcessPythonEnvironmentManager environmentManager,
             String functionUrn,
             GeneratedMessageV3 userDefinedFunctionProto,
             Map<String, String> jobOptions,
@@ -119,7 +119,7 @@ public class BeamTablePythonFunctionRunner extends BeamPythonFunctionRunner {
 
     public static BeamTablePythonFunctionRunner stateless(
             String taskName,
-            PythonEnvironmentManager environmentManager,
+            ProcessPythonEnvironmentManager environmentManager,
             String functionUrn,
             GeneratedMessageV3 userDefinedFunctionProto,
             Map<String, String> jobOptions,
@@ -146,7 +146,7 @@ public class BeamTablePythonFunctionRunner extends BeamPythonFunctionRunner {
 
     public static BeamTablePythonFunctionRunner stateful(
             String taskName,
-            PythonEnvironmentManager environmentManager,
+            ProcessPythonEnvironmentManager environmentManager,
             String functionUrn,
             GeneratedMessageV3 userDefinedFunctionProto,
             Map<String, String> jobOptions,
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java
index b04d12b..8d7a832 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java
@@ -30,6 +30,8 @@ import org.apache.flink.api.common.typeutils.base.ShortSerializer;
 import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
 import org.apache.flink.table.runtime.typeutils.serializers.python.ArrayDataSerializer;
 import org.apache.flink.table.runtime.typeutils.serializers.python.DecimalDataSerializer;
 import org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer;
@@ -56,9 +58,12 @@ import org.apache.flink.table.types.logical.TinyIntType;
 import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
+import org.apache.flink.table.types.utils.TypeConversions;
 
+import java.io.Serializable;
 import java.math.BigDecimal;
 import java.math.RoundingMode;
+import java.sql.Time;
 import java.util.TimeZone;
 
 /**
@@ -84,6 +89,10 @@ public final class PythonTypeUtils {
         return logicalType.accept(new LogicalTypetoInternalSerializerConverter());
     }
 
+    public static DataConverter toDataConverter(LogicalType logicalType) {
+        return logicalType.accept(new LogicalTypeToDataConverter());
+    }
+
     /**
      * Convert the specified bigDecimal according to the specified precision and scale. The
      * specified bigDecimal may be rounded to have the specified scale and then the specified
@@ -458,4 +467,267 @@ public final class PythonTypeUtils {
                             logicalType.asSummaryString()));
         }
     }
+
+    /** Data Converter that converts the data to the java format data which can be used in PemJa. */
+    public abstract static class DataConverter<IN, INTER, OUT> implements Serializable {
+
+        private static final long serialVersionUID = 1L;
+
+        private final DataFormatConverters.DataFormatConverter<IN, INTER> dataFormatConverter;
+
+        public DataConverter(
+                DataFormatConverters.DataFormatConverter<IN, INTER> dataFormatConverter) {
+            this.dataFormatConverter = dataFormatConverter;
+        }
+
+        public final IN toInternal(OUT value) {
+            return dataFormatConverter.toInternal(toInternalImpl(value));
+        }
+
+        public final OUT toExternal(RowData row, int column) {
+            return toExternalImpl(dataFormatConverter.toExternal(row, column));
+        }
+
+        abstract INTER toInternalImpl(OUT value);
+
+        abstract OUT toExternalImpl(INTER value);
+    }
+
+    /** Identity data converter. */
+    public static final class IdentityDataConverter<IN, OUT> extends DataConverter<IN, OUT, OUT> {
+        IdentityDataConverter(
+                DataFormatConverters.DataFormatConverter<IN, OUT> dataFormatConverter) {
+            super(dataFormatConverter);
+        }
+
+        @Override
+        OUT toInternalImpl(OUT value) {
+            return value;
+        }
+
+        @Override
+        OUT toExternalImpl(OUT value) {
+            return value;
+        }
+    }
+
+    /**
+     * Python Long will be converted to Long in PemJa, so we need ByteDataConverter to convert Java
+     * Long to internal Byte.
+     */
+    public static final class ByteDataConverter extends DataConverter<Byte, Byte, Long> {
+
+        public static final ByteDataConverter INSTANCE = new ByteDataConverter();
+
+        private ByteDataConverter() {
+            super(DataFormatConverters.ByteConverter.INSTANCE);
+        }
+
+        @Override
+        Byte toInternalImpl(Long value) {
+            return value.byteValue();
+        }
+
+        @Override
+        Long toExternalImpl(Byte value) {
+            return value.longValue();
+        }
+    }
+
+    /**
+     * Python Long will be converted to Long in PemJa, so we need ShortDataConverter to convert Java
+     * Long to internal Short.
+     */
+    public static final class ShortDataConverter extends DataConverter<Short, Short, Long> {
+
+        public static final ShortDataConverter INSTANCE = new ShortDataConverter();
+
+        private ShortDataConverter() {
+            super(DataFormatConverters.ShortConverter.INSTANCE);
+        }
+
+        @Override
+        Short toInternalImpl(Long value) {
+            return value.shortValue();
+        }
+
+        @Override
+        Long toExternalImpl(Short value) {
+            return value.longValue();
+        }
+    }
+
+    /**
+     * Python Long will be converted to Long in PemJa, so we need IntDataConverter to convert Java
+     * Long to internal Integer.
+     */
+    public static final class IntDataConverter extends DataConverter<Integer, Integer, Long> {
+
+        public static final IntDataConverter INSTANCE = new IntDataConverter();
+
+        private IntDataConverter() {
+            super(DataFormatConverters.IntConverter.INSTANCE);
+        }
+
+        @Override
+        Integer toInternalImpl(Long value) {
+            return value.intValue();
+        }
+
+        @Override
+        Long toExternalImpl(Integer value) {
+            return value.longValue();
+        }
+    }
+
+    /**
+     * Python Float will be converted to Double in PemJa, so we need FloatDataConverter to convert
+     * Java Double to internal Float.
+     */
+    public static final class FloatDataConverter extends DataConverter<Float, Float, Double> {
+
+        public static final FloatDataConverter INSTANCE = new FloatDataConverter();
+
+        private FloatDataConverter() {
+            super(DataFormatConverters.FloatConverter.INSTANCE);
+        }
+
+        @Override
+        Float toInternalImpl(Double value) {
+            return value.floatValue();
+        }
+
+        @Override
+        Double toExternalImpl(Float value) {
+            return value.doubleValue();
+        }
+    }
+
+    /**
+     * Python datetime.time will be converted to Time in PemJa, so we need TimeDataConverter to
+     * convert Java Double to internal Integer.
+     */
+    public static final class TimeDataConverter extends DataConverter<Integer, Integer, Time> {
+
+        public static final TimeDataConverter INSTANCE = new TimeDataConverter();
+
+        private TimeDataConverter() {
+            super(DataFormatConverters.IntConverter.INSTANCE);
+        }
+
+        @Override
+        Integer toInternalImpl(Time value) {
+            return (int) value.getTime();
+        }
+
+        @Override
+        Time toExternalImpl(Integer value) {
+            return new Time(value);
+        }
+    }
+
+    private static final class LogicalTypeToDataConverter
+            extends LogicalTypeDefaultVisitor<DataConverter> {
+
+        @Override
+        public DataConverter visit(BooleanType booleanType) {
+            return defaultConverter(booleanType);
+        }
+
+        @Override
+        public DataConverter visit(TinyIntType tinyIntType) {
+            return ByteDataConverter.INSTANCE;
+        }
+
+        @Override
+        public DataConverter visit(SmallIntType smallIntType) {
+            return ShortDataConverter.INSTANCE;
+        }
+
+        @Override
+        public DataConverter visit(IntType intType) {
+            return IntDataConverter.INSTANCE;
+        }
+
+        @Override
+        public DataConverter visit(BigIntType bigIntType) {
+            return defaultConverter(bigIntType);
+        }
+
+        @Override
+        public DataConverter visit(FloatType floatType) {
+            return FloatDataConverter.INSTANCE;
+        }
+
+        @Override
+        public DataConverter visit(DoubleType doubleType) {
+            return defaultConverter(doubleType);
+        }
+
+        @Override
+        public DataConverter visit(DecimalType decimalType) {
+            return defaultConverter(decimalType);
+        }
+
+        @Override
+        public DataConverter visit(VarCharType varCharType) {
+            return defaultConverter(varCharType);
+        }
+
+        @Override
+        public DataConverter visit(CharType charType) {
+            return defaultConverter(charType);
+        }
+
+        @Override
+        public DataConverter visit(VarBinaryType varBinaryType) {
+            return defaultConverter(varBinaryType);
+        }
+
+        @Override
+        public DataConverter visit(BinaryType binaryType) {
+            return defaultConverter(binaryType);
+        }
+
+        @Override
+        public DataConverter visit(DateType dateType) {
+            return new IdentityDataConverter<>(DataFormatConverters.DateConverter.INSTANCE);
+        }
+
+        @Override
+        public DataConverter visit(TimeType timeType) {
+            return TimeDataConverter.INSTANCE;
+        }
+
+        @Override
+        public DataConverter visit(TimestampType timestampType) {
+            return new IdentityDataConverter<>(
+                    new DataFormatConverters.TimestampConverter(timestampType.getPrecision()));
+        }
+
+        @Override
+        public DataConverter visit(ArrayType arrayType) {
+            return defaultConverter(arrayType);
+        }
+
+        @Override
+        public DataConverter visit(MapType mapType) {
+            return defaultConverter(mapType);
+        }
+
+        @Override
+        protected DataConverter defaultMethod(LogicalType logicalType) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Currently, Python UDF doesn't support logical type %s in Thread Mode.",
+                            logicalType.asSummaryString()));
+        }
+
+        @SuppressWarnings("unchecked")
+        private DataConverter defaultConverter(LogicalType logicalType) {
+            return new IdentityDataConverter<>(
+                    DataFormatConverters.getConverterForDataType(
+                            TypeConversions.fromLogicalToDataType(logicalType)));
+        }
+    }
 }
diff --git a/flink-python/src/main/resources/META-INF/NOTICE b/flink-python/src/main/resources/META-INF/NOTICE
index 21d0508..fa97855 100644
--- a/flink-python/src/main/resources/META-INF/NOTICE
+++ b/flink-python/src/main/resources/META-INF/NOTICE
@@ -27,6 +27,7 @@ This project bundles the following dependencies under the Apache Software Licens
 - org.apache.beam:beam-vendor-sdks-java-extensions-protobuf:2.27.0
 - org.apache.beam:beam-vendor-guava-26_0-jre:0.1
 - org.apache.beam:beam-vendor-grpc-1_26_0:0.3
+- com.alibaba:pemja:0.1.2
 
 This project bundles the following dependencies under the BSD license.
 See bundled license files for details
diff --git a/flink-python/src/test/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManagerTest.java b/flink-python/src/test/java/org/apache/flink/python/env/process/ProcessPythonEnvironmentManagerTest.java
similarity index 95%
rename from flink-python/src/test/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManagerTest.java
rename to flink-python/src/test/java/org/apache/flink/python/env/process/ProcessPythonEnvironmentManagerTest.java
index b1a16be..aca4748 100644
--- a/flink-python/src/test/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManagerTest.java
+++ b/flink-python/src/test/java/org/apache/flink/python/env/process/ProcessPythonEnvironmentManagerTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.python.env.beam;
+package org.apache.flink.python.env.process;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.python.env.PythonDependencyInfo;
@@ -44,14 +44,14 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
-import static org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.PYFLINK_GATEWAY_DISABLED;
-import static org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.PYTHON_ARCHIVES_DIR;
-import static org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.PYTHON_FILES_DIR;
-import static org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.PYTHON_REQUIREMENTS_CACHE;
-import static org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.PYTHON_REQUIREMENTS_DIR;
-import static org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.PYTHON_REQUIREMENTS_FILE;
-import static org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.PYTHON_REQUIREMENTS_INSTALL_DIR;
-import static org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.PYTHON_WORKING_DIR;
+import static org.apache.flink.python.env.process.ProcessPythonEnvironmentManager.PYFLINK_GATEWAY_DISABLED;
+import static org.apache.flink.python.env.process.ProcessPythonEnvironmentManager.PYTHON_ARCHIVES_DIR;
+import static org.apache.flink.python.env.process.ProcessPythonEnvironmentManager.PYTHON_FILES_DIR;
+import static org.apache.flink.python.env.process.ProcessPythonEnvironmentManager.PYTHON_REQUIREMENTS_CACHE;
+import static org.apache.flink.python.env.process.ProcessPythonEnvironmentManager.PYTHON_REQUIREMENTS_DIR;
+import static org.apache.flink.python.env.process.ProcessPythonEnvironmentManager.PYTHON_REQUIREMENTS_FILE;
+import static org.apache.flink.python.env.process.ProcessPythonEnvironmentManager.PYTHON_REQUIREMENTS_INSTALL_DIR;
+import static org.apache.flink.python.env.process.ProcessPythonEnvironmentManager.PYTHON_WORKING_DIR;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator.java
index cde0ef0..a5bf90a 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator.java
@@ -196,7 +196,7 @@ public class PassThroughPythonStreamGroupWindowAggregateOperator<K>
     public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
         return new PassThroughStreamGroupWindowAggregatePythonFunctionRunner(
                 getRuntimeContext().getTaskName(),
-                PythonTestUtils.createTestEnvironmentManager(),
+                PythonTestUtils.createTestProcessEnvironmentManager(),
                 userDefinedFunctionInputType,
                 userDefinedFunctionOutputType,
                 STREAM_GROUP_WINDOW_AGGREGATE_URN,
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupAggregateOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupAggregateOperatorTest.java
index 54a6adf..d5472fc 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupAggregateOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupAggregateOperatorTest.java
@@ -257,7 +257,7 @@ public class PythonStreamGroupAggregateOperatorTest
         public PythonFunctionRunner createPythonFunctionRunner() {
             return new PassThroughStreamAggregatePythonFunctionRunner(
                     getRuntimeContext().getTaskName(),
-                    PythonTestUtils.createTestEnvironmentManager(),
+                    PythonTestUtils.createTestProcessEnvironmentManager(),
                     userDefinedFunctionInputType,
                     outputType,
                     STREAM_GROUP_AGGREGATE_URN,
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupTableAggregateOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupTableAggregateOperatorTest.java
index f325b8a..bdef22d 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupTableAggregateOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupTableAggregateOperatorTest.java
@@ -270,7 +270,7 @@ public class PythonStreamGroupTableAggregateOperatorTest
         public PythonFunctionRunner createPythonFunctionRunner() {
             return new PassThroughStreamTableAggregatePythonFunctionRunner(
                     getRuntimeContext().getTaskName(),
-                    PythonTestUtils.createTestEnvironmentManager(),
+                    PythonTestUtils.createTestProcessEnvironmentManager(),
                     userDefinedFunctionInputType,
                     outputType,
                     STREAM_GROUP_TABLE_AGGREGATE_URN,
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java
index 254cb86..e758119 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java
@@ -233,7 +233,7 @@ public class BatchArrowPythonGroupAggregateFunctionOperatorTest
         public PythonFunctionRunner createPythonFunctionRunner() {
             return new PassThroughPythonAggregateFunctionRunner(
                     getRuntimeContext().getTaskName(),
-                    PythonTestUtils.createTestEnvironmentManager(),
+                    PythonTestUtils.createTestProcessEnvironmentManager(),
                     udfInputType,
                     udfOutputType,
                     getFunctionUrn(),
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java
index d631bf5..a6ec6db 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java
@@ -402,7 +402,7 @@ public class BatchArrowPythonGroupWindowAggregateFunctionOperatorTest
         public PythonFunctionRunner createPythonFunctionRunner() {
             return new PassThroughPythonAggregateFunctionRunner(
                     getRuntimeContext().getTaskName(),
-                    PythonTestUtils.createTestEnvironmentManager(),
+                    PythonTestUtils.createTestProcessEnvironmentManager(),
                     udfInputType,
                     udfOutputType,
                     getFunctionUrn(),
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java
index c272ed4..1fee476 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java
@@ -303,7 +303,7 @@ public class BatchArrowPythonOverWindowAggregateFunctionOperatorTest
         public PythonFunctionRunner createPythonFunctionRunner() {
             return new PassThroughPythonAggregateFunctionRunner(
                     getRuntimeContext().getTaskName(),
-                    PythonTestUtils.createTestEnvironmentManager(),
+                    PythonTestUtils.createTestProcessEnvironmentManager(),
                     udfInputType,
                     udfOutputType,
                     getFunctionUrn(),
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
index dae1313..e762222 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
@@ -517,7 +517,7 @@ public class StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
         public PythonFunctionRunner createPythonFunctionRunner() {
             return new PassThroughPythonAggregateFunctionRunner(
                     getRuntimeContext().getTaskName(),
-                    PythonTestUtils.createTestEnvironmentManager(),
+                    PythonTestUtils.createTestProcessEnvironmentManager(),
                     udfInputType,
                     udfOutputType,
                     getFunctionUrn(),
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java
index 9c410d9..a95157c 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java
@@ -163,7 +163,7 @@ public class StreamArrowPythonProcTimeBoundedRangeOperatorTest
         public PythonFunctionRunner createPythonFunctionRunner() {
             return new PassThroughPythonAggregateFunctionRunner(
                     getRuntimeContext().getTaskName(),
-                    PythonTestUtils.createTestEnvironmentManager(),
+                    PythonTestUtils.createTestProcessEnvironmentManager(),
                     udfInputType,
                     udfOutputType,
                     getFunctionUrn(),
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRowsOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRowsOperatorTest.java
index 3664a1e..b65a0fb 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRowsOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRowsOperatorTest.java
@@ -164,7 +164,7 @@ public class StreamArrowPythonProcTimeBoundedRowsOperatorTest
         public PythonFunctionRunner createPythonFunctionRunner() {
             return new PassThroughPythonAggregateFunctionRunner(
                     getRuntimeContext().getTaskName(),
-                    PythonTestUtils.createTestEnvironmentManager(),
+                    PythonTestUtils.createTestProcessEnvironmentManager(),
                     udfInputType,
                     udfOutputType,
                     getFunctionUrn(),
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
index b7cf02b..849c6b0 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
@@ -319,7 +319,7 @@ public class StreamArrowPythonRowTimeBoundedRangeOperatorTest
         public PythonFunctionRunner createPythonFunctionRunner() {
             return new PassThroughPythonAggregateFunctionRunner(
                     getRuntimeContext().getTaskName(),
-                    PythonTestUtils.createTestEnvironmentManager(),
+                    PythonTestUtils.createTestProcessEnvironmentManager(),
                     udfInputType,
                     udfOutputType,
                     getFunctionUrn(),
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
index acc6c82..1ee0e7f 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
@@ -289,7 +289,7 @@ public class StreamArrowPythonRowTimeBoundedRowsOperatorTest
         public PythonFunctionRunner createPythonFunctionRunner() {
             return new PassThroughPythonAggregateFunctionRunner(
                     getRuntimeContext().getTaskName(),
-                    PythonTestUtils.createTestEnvironmentManager(),
+                    PythonTestUtils.createTestProcessEnvironmentManager(),
                     udfInputType,
                     udfOutputType,
                     getFunctionUrn(),
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java
index 96c7d88..17ae33c 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java
@@ -147,7 +147,7 @@ public class PythonScalarFunctionOperatorTest
         public PythonFunctionRunner createPythonFunctionRunner() throws IOException {
             return new PassThroughPythonScalarFunctionRunner(
                     getRuntimeContext().getTaskName(),
-                    PythonTestUtils.createTestEnvironmentManager(),
+                    PythonTestUtils.createTestProcessEnvironmentManager(),
                     udfInputType,
                     udfOutputType,
                     getFunctionUrn(),
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java
index 5e52d80..953afb0 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java
@@ -146,7 +146,7 @@ public class ArrowPythonScalarFunctionOperatorTest
         public PythonFunctionRunner createPythonFunctionRunner() throws IOException {
             return new PassThroughPythonScalarFunctionRunner(
                     getRuntimeContext().getTaskName(),
-                    PythonTestUtils.createTestEnvironmentManager(),
+                    PythonTestUtils.createTestProcessEnvironmentManager(),
                     udfInputType,
                     udfOutputType,
                     getFunctionUrn(),
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
index 75ad3e7..840503c 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
@@ -128,7 +128,7 @@ public class PythonTableFunctionOperatorTest
         public PythonFunctionRunner createPythonFunctionRunner() {
             return new PassThroughPythonTableFunctionRunner(
                     getRuntimeContext().getTaskName(),
-                    PythonTestUtils.createTestEnvironmentManager(),
+                    PythonTestUtils.createTestProcessEnvironmentManager(),
                     udfInputType,
                     udfOutputType,
                     getFunctionUrn(),
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtilsTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtilsTest.java
index e79d4d4..4804a9d 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtilsTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtilsTest.java
@@ -21,7 +21,9 @@ package org.apache.flink.table.runtime.typeutils;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.UnresolvedUserDefinedType;
@@ -64,6 +66,17 @@ public class PythonTypeUtilsTest {
     }
 
     @Test
+    public void testLogicalTypeToDataConverter() {
+        PythonTypeUtils.DataConverter converter = PythonTypeUtils.toDataConverter(new IntType());
+
+        GenericRowData data = new GenericRowData(1);
+        data.setField(0, 10);
+        Object externalData = converter.toExternal(data, 0);
+        assertTrue(externalData instanceof Long);
+        assertEquals(externalData, 10L);
+    }
+
+    @Test
     public void testUnsupportedTypeSerializer() {
         LogicalType logicalType =
                 new UnresolvedUserDefinedType(UnresolvedIdentifier.of("cat", "db", "MyType"));
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java
index 0752bbd..fb16068 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java
@@ -24,7 +24,7 @@ import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.python.PythonConfig;
-import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
 import org.apache.flink.python.metric.FlinkMetricContainer;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer;
@@ -67,7 +67,7 @@ public class PassThroughPythonAggregateFunctionRunner extends BeamTablePythonFun
 
     public PassThroughPythonAggregateFunctionRunner(
             String taskName,
-            PythonEnvironmentManager environmentManager,
+            ProcessPythonEnvironmentManager environmentManager,
             RowType inputType,
             RowType outputType,
             String functionUrn,
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
index 7638e56..4815507 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
@@ -19,7 +19,7 @@
 package org.apache.flink.table.runtime.utils;
 
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
-import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
 import org.apache.flink.python.metric.FlinkMetricContainer;
 import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
 import org.apache.flink.table.types.logical.RowType;
@@ -43,7 +43,7 @@ public class PassThroughPythonScalarFunctionRunner extends BeamTablePythonFuncti
 
     public PassThroughPythonScalarFunctionRunner(
             String taskName,
-            PythonEnvironmentManager environmentManager,
+            ProcessPythonEnvironmentManager environmentManager,
             RowType inputType,
             RowType outputType,
             String functionUrn,
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
index e4cd02d..9469422 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
@@ -19,7 +19,7 @@
 package org.apache.flink.table.runtime.utils;
 
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
-import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
 import org.apache.flink.python.metric.FlinkMetricContainer;
 import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
 import org.apache.flink.table.types.logical.RowType;
@@ -45,7 +45,7 @@ public class PassThroughPythonTableFunctionRunner extends BeamTablePythonFunctio
 
     public PassThroughPythonTableFunctionRunner(
             String taskName,
-            PythonEnvironmentManager environmentManager,
+            ProcessPythonEnvironmentManager environmentManager,
             RowType inputType,
             RowType outputType,
             String functionUrn,
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java
index 14e147f..e2886005 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.runtime.utils;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
-import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
 import org.apache.flink.python.metric.FlinkMetricContainer;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
@@ -49,7 +49,7 @@ public class PassThroughStreamAggregatePythonFunctionRunner extends BeamTablePyt
 
     public PassThroughStreamAggregatePythonFunctionRunner(
             String taskName,
-            PythonEnvironmentManager environmentManager,
+            ProcessPythonEnvironmentManager environmentManager,
             RowType inputType,
             RowType outputType,
             String functionUrn,
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java
index 12e0e2f..7f977f2 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.runtime.utils;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
-import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
 import org.apache.flink.python.metric.FlinkMetricContainer;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.table.runtime.operators.python.aggregate.PassThroughPythonStreamGroupWindowAggregateOperator;
@@ -45,7 +45,7 @@ public class PassThroughStreamGroupWindowAggregatePythonFunctionRunner
 
     public PassThroughStreamGroupWindowAggregatePythonFunctionRunner(
             String taskName,
-            PythonEnvironmentManager environmentManager,
+            ProcessPythonEnvironmentManager environmentManager,
             RowType inputType,
             RowType outputType,
             String functionUrn,
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java
index b84b63a..f9ed8ac 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.runtime.utils;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
-import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
 import org.apache.flink.python.metric.FlinkMetricContainer;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
@@ -51,7 +51,7 @@ public class PassThroughStreamTableAggregatePythonFunctionRunner
 
     public PassThroughStreamTableAggregatePythonFunctionRunner(
             String taskName,
-            PythonEnvironmentManager environmentManager,
+            ProcessPythonEnvironmentManager environmentManager,
             RowType inputType,
             RowType outputType,
             String functionUrn,
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PythonTestUtils.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PythonTestUtils.java
index 82ac09b..932d9fa 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PythonTestUtils.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PythonTestUtils.java
@@ -20,8 +20,7 @@ package org.apache.flink.table.runtime.utils;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.python.env.PythonDependencyInfo;
-import org.apache.flink.python.env.PythonEnvironmentManager;
-import org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager;
+import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
 import org.apache.flink.python.metric.FlinkMetricContainer;
 import org.apache.flink.python.util.PythonEnvironmentManagerUtils;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
@@ -86,7 +85,7 @@ public final class PythonTestUtils {
                         "root"));
     }
 
-    public static PythonEnvironmentManager createTestEnvironmentManager() {
+    public static ProcessPythonEnvironmentManager createTestProcessEnvironmentManager() {
         Map<String, String> env = new HashMap<>();
         env.put(PythonEnvironmentManagerUtils.PYFLINK_UDF_RUNNER_DIR, "");
         return new ProcessPythonEnvironmentManager(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java
index a877824..db17df4 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java
@@ -74,6 +74,10 @@ public abstract class CommonExecPythonCalc extends ExecNodeBase<RowData>
             "org.apache.flink.table.runtime.operators.python.scalar."
                     + "PythonScalarFunctionOperator";
 
+    private static final String EMBEDDED_PYTHON_SCALAR_FUNCTION_OPERATOR_NAME =
+            "org.apache.flink.table.runtime.operators.python.scalar."
+                    + "EmbeddedPythonScalarFunctionOperator";
+
     private static final String ARROW_PYTHON_SCALAR_FUNCTION_OPERATOR_NAME =
             "org.apache.flink.table.runtime.operators.python.scalar.arrow."
                     + "ArrowPythonScalarFunctionOperator";
@@ -207,10 +211,15 @@ public abstract class CommonExecPythonCalc extends ExecNodeBase<RowData>
             int[] forwardedFields,
             boolean isArrow) {
         Class<?> clazz;
+        boolean isInProcessMode = CommonPythonUtil.isPythonWorkerInProcessMode(mergedConfig);
         if (isArrow) {
             clazz = CommonPythonUtil.loadClass(ARROW_PYTHON_SCALAR_FUNCTION_OPERATOR_NAME);
         } else {
-            clazz = CommonPythonUtil.loadClass(PYTHON_SCALAR_FUNCTION_OPERATOR_NAME);
+            if (isInProcessMode) {
+                clazz = CommonPythonUtil.loadClass(PYTHON_SCALAR_FUNCTION_OPERATOR_NAME);
+            } else {
+                clazz = CommonPythonUtil.loadClass(EMBEDDED_PYTHON_SCALAR_FUNCTION_OPERATOR_NAME);
+            }
         }
 
         final RowType inputType = inputRowTypeInfo.toRowType();
@@ -224,34 +233,79 @@ public abstract class CommonExecPythonCalc extends ExecNodeBase<RowData>
                                 .project(outputType);
 
         try {
-            Constructor<?> ctor =
-                    clazz.getConstructor(
-                            Configuration.class,
-                            PythonFunctionInfo[].class,
-                            RowType.class,
-                            RowType.class,
-                            RowType.class,
-                            GeneratedProjection.class,
-                            GeneratedProjection.class);
-            return (OneInputStreamOperator<RowData, RowData>)
-                    ctor.newInstance(
-                            mergedConfig,
-                            pythonFunctionInfos,
-                            inputType,
-                            udfInputType,
-                            udfOutputType,
-                            ProjectionCodeGenerator.generateProjection(
-                                    CodeGeneratorContext.apply(tableConfig),
-                                    "UdfInputProjection",
+            if (isInProcessMode) {
+                Constructor<?> ctor =
+                        clazz.getConstructor(
+                                Configuration.class,
+                                PythonFunctionInfo[].class,
+                                RowType.class,
+                                RowType.class,
+                                RowType.class,
+                                GeneratedProjection.class,
+                                GeneratedProjection.class);
+                return (OneInputStreamOperator<RowData, RowData>)
+                        ctor.newInstance(
+                                mergedConfig,
+                                pythonFunctionInfos,
+                                inputType,
+                                udfInputType,
+                                udfOutputType,
+                                ProjectionCodeGenerator.generateProjection(
+                                        CodeGeneratorContext.apply(tableConfig),
+                                        "UdfInputProjection",
+                                        inputType,
+                                        udfInputType,
+                                        udfInputOffsets),
+                                ProjectionCodeGenerator.generateProjection(
+                                        CodeGeneratorContext.apply(tableConfig),
+                                        "ForwardedFieldProjection",
+                                        inputType,
+                                        forwardedFieldType,
+                                        forwardedFields));
+            } else {
+                if (forwardedFields.length > 0) {
+                    Constructor<?> ctor =
+                            clazz.getConstructor(
+                                    Configuration.class,
+                                    PythonFunctionInfo[].class,
+                                    RowType.class,
+                                    RowType.class,
+                                    RowType.class,
+                                    int[].class,
+                                    GeneratedProjection.class);
+                    return (OneInputStreamOperator<RowData, RowData>)
+                            ctor.newInstance(
+                                    mergedConfig,
+                                    pythonFunctionInfos,
                                     inputType,
                                     udfInputType,
-                                    udfInputOffsets),
-                            ProjectionCodeGenerator.generateProjection(
-                                    CodeGeneratorContext.apply(tableConfig),
-                                    "ForwardedFieldProjection",
+                                    udfOutputType,
+                                    udfInputOffsets,
+                                    ProjectionCodeGenerator.generateProjection(
+                                            CodeGeneratorContext.apply(tableConfig),
+                                            "ForwardedFieldProjection",
+                                            inputType,
+                                            forwardedFieldType,
+                                            forwardedFields));
+                } else {
+                    Constructor<?> ctor =
+                            clazz.getConstructor(
+                                    Configuration.class,
+                                    PythonFunctionInfo[].class,
+                                    RowType.class,
+                                    RowType.class,
+                                    RowType.class,
+                                    int[].class);
+                    return (OneInputStreamOperator<RowData, RowData>)
+                            ctor.newInstance(
+                                    mergedConfig,
+                                    pythonFunctionInfos,
                                     inputType,
-                                    forwardedFieldType,
-                                    forwardedFields));
+                                    udfInputType,
+                                    udfOutputType,
+                                    udfInputOffsets);
+                }
+            }
         } catch (Exception e) {
             throw new TableException("Python Scalar Function Operator constructed failed.", e);
         }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java
index a7c3024..87b9b33 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java
@@ -158,6 +158,20 @@ public class CommonPythonUtil {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    public static boolean isPythonWorkerInProcessMode(Configuration config) {
+        Class clazz = loadClass("org.apache.flink.python.PythonOptions");
+        try {
+            return config.getString(
+                            (ConfigOption<String>)
+                                    (clazz.getField("PYTHON_EXECUTION_MODE").get(null)))
+                    .equalsIgnoreCase("process");
+
+        } catch (IllegalAccessException | NoSuchFieldException e) {
+            throw new TableException("Field PYTHON_EXECUTION_MODE accessed failed.", e);
+        }
+    }
+
     public static Tuple2<PythonAggregateFunctionInfo[], DataViewSpec[][]>
             extractPythonAggregateFunctionInfos(
                     AggregateInfoList pythonAggregateInfoList, AggregateCall[] aggCalls) {