You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2020/07/14 07:13:57 UTC

[flink] branch master updated: [FLINK-18491][python] Extract the Beam specific coder classes into a separate Python module

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ca6cef9  [FLINK-18491][python] Extract the Beam specific coder classes into a separate Python module
ca6cef9 is described below

commit ca6cef968efd4153e5de620977784454f0fb0c1f
Author: huangxingbo <hx...@gmail.com>
AuthorDate: Wed Jul 8 19:36:29 2020 +0800

    [FLINK-18491][python] Extract the Beam specific coder classes into a separate Python module
    
    This closes #12870.
---
 flink-python/bin/pyflink-udf-runner.sh             |   2 +-
 .../{sdk_worker_main.py => beam/__init__.py}       |  24 --
 .../fn_execution/{boot.py => beam/beam_boot.py}    |   2 +-
 .../beam_coder_impl.pxd}                           |  33 +-
 .../pyflink/fn_execution/beam/beam_coder_impl.pyx  |  58 ++++
 .../pyflink/fn_execution/beam/beam_coders.py       | 213 +++++++++++++
 .../beam_sdk_worker_main.py}                       |  11 +-
 .../beam_slow_coder_impl.py}                       |   0
 .../{sdk_worker_main.py => beam/beam_stream.pxd}   |  37 +--
 .../pyflink/fn_execution/beam/beam_stream.pyx      | 106 +++++++
 flink-python/pyflink/fn_execution/coders.py        | 318 +++++--------------
 .../pyflink/fn_execution/fast_coder_impl.pxd       | 138 +++-----
 .../pyflink/fn_execution/fast_coder_impl.pyx       | 347 +++++++--------------
 .../pyflink/fn_execution/fast_operations.pxd       |   5 +-
 .../pyflink/fn_execution/fast_operations.pyx       |  33 +-
 .../{sdk_worker_main.py => stream.pxd}             |  29 +-
 .../{sdk_worker_main.py => stream.pyx}             |  33 +-
 .../pyflink/fn_execution/tests/test_coders.py      |   7 +-
 .../pyflink/fn_execution/tests/test_fast_coders.py |  20 +-
 .../fn_execution/tests/test_process_mode_boot.py   |   5 +-
 flink-python/setup.py                              |  29 +-
 .../python/beam/BeamPythonFunctionRunner.java      |  10 +-
 22 files changed, 735 insertions(+), 725 deletions(-)

diff --git a/flink-python/bin/pyflink-udf-runner.sh b/flink-python/bin/pyflink-udf-runner.sh
index 04bdbc7..7c0c557 100755
--- a/flink-python/bin/pyflink-udf-runner.sh
+++ b/flink-python/bin/pyflink-udf-runner.sh
@@ -40,4 +40,4 @@ if [[ "$_PYTHON_WORKING_DIR" != "" ]]; then
 fi
 
 log="$BOOT_LOG_DIR/flink-python-udf-boot.log"
-${python} -m pyflink.fn_execution.boot $@ 2>&1 | tee ${log}
+${python} -m pyflink.fn_execution.beam.beam_boot $@ 2>&1 | tee ${log}
diff --git a/flink-python/pyflink/fn_execution/sdk_worker_main.py b/flink-python/pyflink/fn_execution/beam/__init__.py
similarity index 56%
copy from flink-python/pyflink/fn_execution/sdk_worker_main.py
copy to flink-python/pyflink/fn_execution/beam/__init__.py
index 5e750af..65b48d4 100644
--- a/flink-python/pyflink/fn_execution/sdk_worker_main.py
+++ b/flink-python/pyflink/fn_execution/beam/__init__.py
@@ -15,27 +15,3 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-import os
-import sys
-
-# force to register the operations to SDK Harness
-from apache_beam.options.pipeline_options import PipelineOptions
-
-try:
-    import pyflink.fn_execution.fast_operations
-except ImportError:
-    import pyflink.fn_execution.operations
-
-# force to register the coders to SDK Harness
-import pyflink.fn_execution.coders # noqa # pylint: disable=unused-import
-
-import apache_beam.runners.worker.sdk_worker_main
-
-if 'PIPELINE_OPTIONS' in os.environ:
-    pipeline_options = apache_beam.runners.worker.sdk_worker_main._parse_pipeline_options(
-        os.environ['PIPELINE_OPTIONS'])
-else:
-    pipeline_options = PipelineOptions.from_dictionary({})
-
-if __name__ == '__main__':
-    apache_beam.runners.worker.sdk_worker_main.main(sys.argv)
diff --git a/flink-python/pyflink/fn_execution/boot.py b/flink-python/pyflink/fn_execution/beam/beam_boot.py
similarity index 98%
rename from flink-python/pyflink/fn_execution/boot.py
rename to flink-python/pyflink/fn_execution/beam/beam_boot.py
index 1286c91..f460ae6 100644
--- a/flink-python/pyflink/fn_execution/boot.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_boot.py
@@ -103,5 +103,5 @@ if __name__ == "__main__":
     if "FLINK_BOOT_TESTING" in os.environ and os.environ["FLINK_BOOT_TESTING"] == "1":
         exit(0)
 
-    call([python_exec, "-m", "pyflink.fn_execution.sdk_worker_main"],
+    call([python_exec, "-m", "pyflink.fn_execution.beam.beam_sdk_worker_main"],
          stdout=sys.stdout, stderr=sys.stderr, env=env)
diff --git a/flink-python/pyflink/fn_execution/sdk_worker_main.py b/flink-python/pyflink/fn_execution/beam/beam_coder_impl.pxd
similarity index 56%
copy from flink-python/pyflink/fn_execution/sdk_worker_main.py
copy to flink-python/pyflink/fn_execution/beam/beam_coder_impl.pxd
index 5e750af..de58441 100644
--- a/flink-python/pyflink/fn_execution/sdk_worker_main.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_coder_impl.pxd
@@ -15,27 +15,22 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-import os
-import sys
+# cython: language_level = 3
+# cython: infer_types = True
+# cython: profile=True
+# cython: boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True
 
-# force to register the operations to SDK Harness
-from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.coders.coder_impl cimport StreamCoderImpl
 
-try:
-    import pyflink.fn_execution.fast_operations
-except ImportError:
-    import pyflink.fn_execution.operations
+from pyflink.fn_execution.fast_coder_impl cimport BaseCoderImpl
+from pyflink.fn_execution.stream cimport InputStream
 
-# force to register the coders to SDK Harness
-import pyflink.fn_execution.coders # noqa # pylint: disable=unused-import
+cdef class PassThroughLengthPrefixCoderImpl(StreamCoderImpl):
+    cdef readonly StreamCoderImpl _value_coder
 
-import apache_beam.runners.worker.sdk_worker_main
+cdef class BeamCoderImpl(StreamCoderImpl):
+    cdef readonly BaseCoderImpl _value_coder
 
-if 'PIPELINE_OPTIONS' in os.environ:
-    pipeline_options = apache_beam.runners.worker.sdk_worker_main._parse_pipeline_options(
-        os.environ['PIPELINE_OPTIONS'])
-else:
-    pipeline_options = PipelineOptions.from_dictionary({})
-
-if __name__ == '__main__':
-    apache_beam.runners.worker.sdk_worker_main.main(sys.argv)
+cdef class InputStreamWrapper:
+    cdef BaseCoderImpl _value_coder
+    cdef InputStream _input_stream
diff --git a/flink-python/pyflink/fn_execution/beam/beam_coder_impl.pyx b/flink-python/pyflink/fn_execution/beam/beam_coder_impl.pyx
new file mode 100644
index 0000000..9874030
--- /dev/null
+++ b/flink-python/pyflink/fn_execution/beam/beam_coder_impl.pyx
@@ -0,0 +1,58 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+# cython: language_level = 3
+# cython: infer_types = True
+# cython: profile=True
+# cython: boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True
+
+from apache_beam.coders.coder_impl cimport InputStream as BInputStream
+from apache_beam.coders.coder_impl cimport OutputStream as BOutputStream
+from apache_beam.coders.coder_impl cimport StreamCoderImpl
+
+from pyflink.fn_execution.beam.beam_stream cimport BeamInputStream
+
+cdef class PassThroughLengthPrefixCoderImpl(StreamCoderImpl):
+    def __cinit__(self, value_coder):
+        self._value_coder = value_coder
+
+    cpdef encode_to_stream(self, value, BOutputStream out_stream, bint nested):
+        self._value_coder.encode_to_stream(value, out_stream, nested)
+
+    cpdef decode_from_stream(self, BInputStream in_stream, bint nested):
+        return self._value_coder.decode_from_stream(in_stream, nested)
+
+    cpdef get_estimated_size_and_observables(self, value, bint nested=False):
+        return 0, []
+
+cdef class BeamCoderImpl(StreamCoderImpl):
+    def __cinit__(self, value_coder):
+        self._value_coder = value_coder
+
+    cpdef encode_to_stream(self, value, BOutputStream out_stream, bint nested):
+        self._value_coder.encode(value, out_stream)
+
+    cpdef decode_from_stream(self, BInputStream in_stream, bint nested):
+        cdef BeamInputStream input_stream = BeamInputStream(in_stream, in_stream.size())
+        cdef InputStreamWrapper input_stream_wrapper = InputStreamWrapper(self._value_coder,
+                                                                          input_stream)
+        return input_stream_wrapper
+
+cdef class InputStreamWrapper:
+    def __cinit__(self, value_coder, input_stream):
+        self._value_coder = value_coder
+        self._input_stream = input_stream
diff --git a/flink-python/pyflink/fn_execution/beam/beam_coders.py b/flink-python/pyflink/fn_execution/beam/beam_coders.py
new file mode 100644
index 0000000..6c284c9
--- /dev/null
+++ b/flink-python/pyflink/fn_execution/beam/beam_coders.py
@@ -0,0 +1,213 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+import pyarrow as pa
+import pytz
+from apache_beam.coders import Coder
+from apache_beam.coders.coders import FastCoder, LengthPrefixCoder
+from apache_beam.portability import common_urns
+from apache_beam.typehints import typehints
+
+from pyflink.fn_execution.beam import beam_slow_coder_impl
+
+try:
+    from pyflink.fn_execution import fast_coder_impl as coder_impl
+    from pyflink.fn_execution.beam.beam_coder_impl import BeamCoderImpl, \
+        PassThroughLengthPrefixCoderImpl
+except ImportError:
+    coder_impl = beam_slow_coder_impl
+    BeamCoderImpl = lambda a: a
+    PassThroughLengthPrefixCoderImpl = coder_impl.PassThroughLengthPrefixCoderImpl
+
+from pyflink.fn_execution import flink_fn_execution_pb2, coders
+from pyflink.table.types import TinyIntType, SmallIntType, IntType, BigIntType, BooleanType, \
+    FloatType, DoubleType, VarCharType, VarBinaryType, DecimalType, DateType, TimeType, \
+    LocalZonedTimestampType, RowType, RowField, to_arrow_type, TimestampType, ArrayType
+
+
+class PassThroughLengthPrefixCoder(LengthPrefixCoder):
+    """
+    Coder which doesn't prefix the length of the encoded object as the length prefix will be handled
+    by the wrapped value coder.
+    """
+
+    def __init__(self, value_coder):
+        super(PassThroughLengthPrefixCoder, self).__init__(value_coder)
+
+    def _create_impl(self):
+        return PassThroughLengthPrefixCoderImpl(self._value_coder.get_impl())
+
+    def __repr__(self):
+        return 'PassThroughLengthPrefixCoder[%s]' % self._value_coder
+
+
+Coder.register_structured_urn(
+    common_urns.coders.LENGTH_PREFIX.urn, PassThroughLengthPrefixCoder)
+
+
+class BeamTableFunctionRowCoder(FastCoder):
+    """
+    Coder for Table Function Row.
+    """
+
+    def __init__(self, table_function_row_coder):
+        self._table_function_row_coder = table_function_row_coder
+
+    def _create_impl(self):
+        return self._table_function_row_coder.get_impl()
+
+    def get_impl(self):
+        return BeamCoderImpl(self._create_impl())
+
+    def to_type_hint(self):
+        return typehints.List
+
+    @Coder.register_urn(coders.FLINK_TABLE_FUNCTION_SCHEMA_CODER_URN, flink_fn_execution_pb2.Schema)
+    def _pickle_from_runner_api_parameter(schema_proto, unused_components, unused_context):
+        return BeamTableFunctionRowCoder(
+            coders.TableFunctionRowCoder.from_schema_proto(schema_proto))
+
+    def __repr__(self):
+        return 'TableFunctionRowCoder[%s]' % repr(self._table_function_row_coder)
+
+    def __eq__(self, other):
+        return (self.__class__ == other.__class__
+                and self._table_function_row_coder == other._table_function_row_coder)
+
+    def __ne__(self, other):
+        return not self == other
+
+    def __hash__(self):
+        return hash(self._table_function_row_coder)
+
+
+class BeamFlattenRowCoder(FastCoder):
+    """
+    Coder for Row. The decoded result will be flattened as a list of column values of a row instead
+    of a row object.
+    """
+
+    def __init__(self, flatten_coder):
+        self._flatten_coder = flatten_coder
+
+    def _create_impl(self):
+        return self._flatten_coder.get_impl()
+
+    def get_impl(self):
+        return BeamCoderImpl(self._create_impl())
+
+    def to_type_hint(self):
+        return typehints.List
+
+    @Coder.register_urn(coders.FLINK_SCALAR_FUNCTION_SCHEMA_CODER_URN,
+                        flink_fn_execution_pb2.Schema)
+    def _pickle_from_runner_api_parameter(schema_proto, unused_components, unused_context):
+        return BeamFlattenRowCoder(coders.FlattenRowCoder.from_schema_proto(schema_proto))
+
+    def __repr__(self):
+        return 'BeamFlattenRowCoder[%s]' % repr(self._flatten_coder)
+
+    def __eq__(self, other):
+        return (self.__class__ == other.__class__
+                and self._flatten_coder == other._flatten_coder)
+
+    def __ne__(self, other):
+        return not self == other
+
+    def __hash__(self):
+        return hash(self._flatten_coder)
+
+
+class ArrowCoder(FastCoder):
+    """
+    Coder for Arrow.
+    """
+
+    def __init__(self, schema, row_type, timezone):
+        self._schema = schema
+        self._row_type = row_type
+        self._timezone = timezone
+
+    def _create_impl(self):
+        return beam_slow_coder_impl.ArrowCoderImpl(self._schema, self._row_type, self._timezone)
+
+    def to_type_hint(self):
+        import pandas as pd
+        return pd.Series
+
+    @Coder.register_urn(coders.FLINK_SCALAR_FUNCTION_SCHEMA_ARROW_CODER_URN,
+                        flink_fn_execution_pb2.Schema)
+    def _pickle_from_runner_api_parameter(schema_proto, unused_components, unused_context):
+
+        def _to_arrow_schema(row_type):
+            return pa.schema([pa.field(n, to_arrow_type(t), t._nullable)
+                              for n, t in zip(row_type.field_names(), row_type.field_types())])
+
+        def _to_data_type(field_type):
+            if field_type.type_name == flink_fn_execution_pb2.Schema.TINYINT:
+                return TinyIntType(field_type.nullable)
+            elif field_type.type_name == flink_fn_execution_pb2.Schema.SMALLINT:
+                return SmallIntType(field_type.nullable)
+            elif field_type.type_name == flink_fn_execution_pb2.Schema.INT:
+                return IntType(field_type.nullable)
+            elif field_type.type_name == flink_fn_execution_pb2.Schema.BIGINT:
+                return BigIntType(field_type.nullable)
+            elif field_type.type_name == flink_fn_execution_pb2.Schema.BOOLEAN:
+                return BooleanType(field_type.nullable)
+            elif field_type.type_name == flink_fn_execution_pb2.Schema.FLOAT:
+                return FloatType(field_type.nullable)
+            elif field_type.type_name == flink_fn_execution_pb2.Schema.DOUBLE:
+                return DoubleType(field_type.nullable)
+            elif field_type.type_name == flink_fn_execution_pb2.Schema.VARCHAR:
+                return VarCharType(0x7fffffff, field_type.nullable)
+            elif field_type.type_name == flink_fn_execution_pb2.Schema.VARBINARY:
+                return VarBinaryType(0x7fffffff, field_type.nullable)
+            elif field_type.type_name == flink_fn_execution_pb2.Schema.DECIMAL:
+                return DecimalType(field_type.decimal_info.precision,
+                                   field_type.decimal_info.scale,
+                                   field_type.nullable)
+            elif field_type.type_name == flink_fn_execution_pb2.Schema.DATE:
+                return DateType(field_type.nullable)
+            elif field_type.type_name == flink_fn_execution_pb2.Schema.TIME:
+                return TimeType(field_type.time_info.precision, field_type.nullable)
+            elif field_type.type_name == \
+                    flink_fn_execution_pb2.Schema.LOCAL_ZONED_TIMESTAMP:
+                return LocalZonedTimestampType(field_type.local_zoned_timestamp_info.precision,
+                                               field_type.nullable)
+            elif field_type.type_name == flink_fn_execution_pb2.Schema.TIMESTAMP:
+                return TimestampType(field_type.timestamp_info.precision, field_type.nullable)
+            elif field_type.type_name == flink_fn_execution_pb2.Schema.ARRAY:
+                return ArrayType(_to_data_type(field_type.collection_element_type),
+                                 field_type.nullable)
+            elif field_type.type_name == flink_fn_execution_pb2.Schema.TypeName.ROW:
+                return RowType(
+                    [RowField(f.name, _to_data_type(f.type), f.description)
+                     for f in field_type.row_schema.fields], field_type.nullable)
+            else:
+                raise ValueError("field_type %s is not supported." % field_type)
+
+        def _to_row_type(row_schema):
+            return RowType([RowField(f.name, _to_data_type(f.type)) for f in row_schema.fields])
+
+        timezone = pytz.timezone(os.environ['table.exec.timezone'])
+        row_type = _to_row_type(schema_proto)
+        return ArrowCoder(_to_arrow_schema(row_type), row_type, timezone)
+
+    def __repr__(self):
+        return 'ArrowCoder[%s]' % self._schema
diff --git a/flink-python/pyflink/fn_execution/sdk_worker_main.py b/flink-python/pyflink/fn_execution/beam/beam_sdk_worker_main.py
similarity index 77%
copy from flink-python/pyflink/fn_execution/sdk_worker_main.py
copy to flink-python/pyflink/fn_execution/beam/beam_sdk_worker_main.py
index 5e750af..48f7a31 100644
--- a/flink-python/pyflink/fn_execution/sdk_worker_main.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_sdk_worker_main.py
@@ -15,27 +15,18 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-import os
 import sys
 
 # force to register the operations to SDK Harness
-from apache_beam.options.pipeline_options import PipelineOptions
-
 try:
     import pyflink.fn_execution.fast_operations
 except ImportError:
     import pyflink.fn_execution.operations
 
 # force to register the coders to SDK Harness
-import pyflink.fn_execution.coders # noqa # pylint: disable=unused-import
+import pyflink.fn_execution.beam.beam_coders # noqa # pylint: disable=unused-import
 
 import apache_beam.runners.worker.sdk_worker_main
 
-if 'PIPELINE_OPTIONS' in os.environ:
-    pipeline_options = apache_beam.runners.worker.sdk_worker_main._parse_pipeline_options(
-        os.environ['PIPELINE_OPTIONS'])
-else:
-    pipeline_options = PipelineOptions.from_dictionary({})
-
 if __name__ == '__main__':
     apache_beam.runners.worker.sdk_worker_main.main(sys.argv)
diff --git a/flink-python/pyflink/fn_execution/coder_impl.py b/flink-python/pyflink/fn_execution/beam/beam_slow_coder_impl.py
similarity index 100%
rename from flink-python/pyflink/fn_execution/coder_impl.py
rename to flink-python/pyflink/fn_execution/beam/beam_slow_coder_impl.py
diff --git a/flink-python/pyflink/fn_execution/sdk_worker_main.py b/flink-python/pyflink/fn_execution/beam/beam_stream.pxd
similarity index 55%
copy from flink-python/pyflink/fn_execution/sdk_worker_main.py
copy to flink-python/pyflink/fn_execution/beam/beam_stream.pxd
index 5e750af..4876550 100644
--- a/flink-python/pyflink/fn_execution/sdk_worker_main.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_stream.pxd
@@ -15,27 +15,24 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-import os
-import sys
+# cython: language_level = 3
 
-# force to register the operations to SDK Harness
-from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.coders.coder_impl cimport InputStream as BInputStream
+from apache_beam.coders.coder_impl cimport OutputStream as BOutputStream
 
-try:
-    import pyflink.fn_execution.fast_operations
-except ImportError:
-    import pyflink.fn_execution.operations
+from pyflink.fn_execution.stream cimport InputStream, OutputStream
 
-# force to register the coders to SDK Harness
-import pyflink.fn_execution.coders # noqa # pylint: disable=unused-import
+cdef class BeamInputStream(InputStream):
+    cdef char*_input_data
+    cdef size_t _input_buffer_size
+    cdef size_t _input_pos
+    cdef void _parse_input_stream(self, BInputStream input_stream)
 
-import apache_beam.runners.worker.sdk_worker_main
-
-if 'PIPELINE_OPTIONS' in os.environ:
-    pipeline_options = apache_beam.runners.worker.sdk_worker_main._parse_pipeline_options(
-        os.environ['PIPELINE_OPTIONS'])
-else:
-    pipeline_options = PipelineOptions.from_dictionary({})
-
-if __name__ == '__main__':
-    apache_beam.runners.worker.sdk_worker_main.main(sys.argv)
+cdef class BeamOutputStream(OutputStream):
+    cdef char*_output_data
+    cdef size_t _output_pos
+    cdef size_t _output_buffer_size
+    cdef BOutputStream _output_stream
+    cdef void _map_output_data_to_output_stream(self)
+    cdef void _maybe_flush(self)
+    cdef void _parse_output_stream(self, BOutputStream output_stream)
diff --git a/flink-python/pyflink/fn_execution/beam/beam_stream.pyx b/flink-python/pyflink/fn_execution/beam/beam_stream.pyx
new file mode 100644
index 0000000..d461470
--- /dev/null
+++ b/flink-python/pyflink/fn_execution/beam/beam_stream.pyx
@@ -0,0 +1,106 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+# cython: language_level = 3
+# cython: infer_types = True
+# cython: profile=True
+# cython: boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True
+
+from libc.stdlib cimport realloc
+from libc.string cimport memcpy
+
+cdef class BeamInputStream(InputStream):
+    def __cinit__(self, input_stream, size):
+        self._input_buffer_size = size
+        self._input_pos = 0
+        self._parse_input_stream(input_stream)
+
+    cdef size_t read(self, char** data):
+        cdef size_t length = 0
+        cdef bint has_prefix = True
+        cdef size_t shift = 0
+        cdef char bits
+        # read the var-int size
+        while has_prefix:
+            bits = self._input_data[self._input_pos] & 0x7F
+            length |= bits << shift
+            shift += 7
+            if not (self._input_data[self._input_pos] & 0x80):
+                has_prefix = False
+            self._input_pos += 1
+        data[0] = self._input_data + self._input_pos
+        self._input_pos += length
+        return length
+
+    cdef size_t available(self):
+        return self._input_buffer_size - self._input_pos
+
+    cdef void _parse_input_stream(self, BInputStream input_stream):
+        self._input_data = input_stream.allc
+        input_stream.pos = self._input_buffer_size
+
+cdef class BeamOutputStream(OutputStream):
+    def __cinit__(self, output_stream):
+        self._output_stream = output_stream
+        self._parse_output_stream(output_stream)
+
+    cdef void write(self, char*data, size_t length):
+        cdef char bits
+        cdef size_t size = length
+        # the length of the variable prefix length will be less than 9 bytes
+        if self._output_buffer_size < self._output_pos + length + 9:
+            self._output_buffer_size += length + 9
+            self._output_data = <char*> realloc(self._output_data,
+                                                self._output_buffer_size)
+        # write variable prefix length
+        while size:
+            bits = size & 0x7F
+            size >>= 7
+            if size:
+                bits |= 0x80
+            self._output_data[self._output_pos] = bits
+            self._output_pos += 1
+
+        if length < 8:
+            # This is faster than memcpy when the string is short.
+            for i in range(length):
+                self._output_data[self._output_pos + i] = data[i]
+        else:
+            memcpy(self._output_data + self._output_pos, data, length)
+        self._output_pos += length
+        self._maybe_flush()
+
+    cpdef void flush(self):
+        cdef size_t i
+        self._map_output_data_to_output_stream()
+        self._output_stream.maybe_flush()
+
+    cdef void _parse_output_stream(self, BOutputStream output_stream):
+        self._output_data = output_stream.data
+        self._output_pos = output_stream.pos
+        self._output_buffer_size = output_stream.buffer_size
+
+    cdef void _maybe_flush(self):
+        if self._output_pos > 10_000_000:
+            self._map_output_data_to_output_stream()
+            self._output_stream.flush()
+            self._output_pos = 0
+
+    cdef void _map_output_data_to_output_stream(self):
+        self._output_stream.data = self._output_data
+        self._output_stream.pos = self._output_pos
+        self._output_stream.buffer_size = self._output_buffer_size
diff --git a/flink-python/pyflink/fn_execution/coders.py b/flink-python/pyflink/fn_execution/coders.py
index d2c9e79..8969f31 100644
--- a/flink-python/pyflink/fn_execution/coders.py
+++ b/flink-python/pyflink/fn_execution/coders.py
@@ -16,58 +16,51 @@
 # limitations under the License.
 ################################################################################
 
+import os
 from abc import ABC
 
-
-import datetime
-import decimal
-import pyarrow as pa
 import pytz
-from apache_beam.coders import Coder
-from apache_beam.coders.coders import FastCoder, LengthPrefixCoder
-from apache_beam.options.pipeline_options import DebugOptions
-from apache_beam.portability import common_urns
-from apache_beam.typehints import typehints
 
-from pyflink.fn_execution import coder_impl as slow_coder_impl
+from pyflink.fn_execution import flink_fn_execution_pb2
+
 try:
     from pyflink.fn_execution import fast_coder_impl as coder_impl
-except ImportError:
-    coder_impl = slow_coder_impl
-from pyflink.fn_execution import flink_fn_execution_pb2
-from pyflink.fn_execution.sdk_worker_main import pipeline_options
-from pyflink.table.types import Row, TinyIntType, SmallIntType, IntType, BigIntType, BooleanType, \
-    FloatType, DoubleType, VarCharType, VarBinaryType, DecimalType, DateType, TimeType, \
-    LocalZonedTimestampType, RowType, RowField, to_arrow_type, TimestampType, ArrayType
+except:
+    from pyflink.fn_execution.beam import beam_slow_coder_impl as coder_impl
+
+__all__ = ['RowCoder', 'BigIntCoder', 'TinyIntCoder', 'BooleanCoder',
+           'SmallIntCoder', 'IntCoder', 'FloatCoder', 'DoubleCoder',
+           'BinaryCoder', 'CharCoder', 'DateCoder', 'TimeCoder',
+           'TimestampCoder', 'ArrayCoder', 'MapCoder', 'DecimalCoder']
 
 FLINK_SCALAR_FUNCTION_SCHEMA_CODER_URN = "flink:coder:schema:scalar_function:v1"
 FLINK_TABLE_FUNCTION_SCHEMA_CODER_URN = "flink:coder:schema:table_function:v1"
 FLINK_SCALAR_FUNCTION_SCHEMA_ARROW_CODER_URN = "flink:coder:schema:scalar_function:arrow:v1"
 
 
-__all__ = ['FlattenRowCoder', 'RowCoder', 'BigIntCoder', 'TinyIntCoder', 'BooleanCoder',
-           'SmallIntCoder', 'IntCoder', 'FloatCoder', 'DoubleCoder',
-           'BinaryCoder', 'CharCoder', 'DateCoder', 'TimeCoder',
-           'TimestampCoder', 'ArrayCoder', 'MapCoder', 'DecimalCoder', 'ArrowCoder']
+class BaseCoder(ABC):
+    def get_impl(self):
+        pass
+
+    @staticmethod
+    def from_schema_proto(schema_proto):
+        pass
 
 
-class TableFunctionRowCoder(FastCoder):
+class TableFunctionRowCoder(BaseCoder):
     """
     Coder for Table Function Row.
     """
+
     def __init__(self, flatten_row_coder):
         self._flatten_row_coder = flatten_row_coder
 
-    def _create_impl(self):
+    def get_impl(self):
         return coder_impl.TableFunctionRowCoderImpl(self._flatten_row_coder.get_impl())
 
-    def to_type_hint(self):
-        return typehints.List
-
-    @Coder.register_urn(FLINK_TABLE_FUNCTION_SCHEMA_CODER_URN, flink_fn_execution_pb2.Schema)
-    def _pickle_from_runner_api_parameter(schema_proto, unused_components, unused_context):
-        return TableFunctionRowCoder(FlattenRowCoder([from_proto(f.type)
-                                                      for f in schema_proto.fields]))
+    @staticmethod
+    def from_schema_proto(schema_proto):
+        return TableFunctionRowCoder(FlattenRowCoder.from_schema_proto(schema_proto))
 
     def __repr__(self):
         return 'TableFunctionRowCoder[%s]' % repr(self._flatten_row_coder)
@@ -83,7 +76,7 @@ class TableFunctionRowCoder(FastCoder):
         return hash(self._flatten_row_coder)
 
 
-class FlattenRowCoder(FastCoder):
+class FlattenRowCoder(BaseCoder):
     """
     Coder for Row. The decoded result will be flattened as a list of column values of a row instead
     of a row object.
@@ -92,17 +85,11 @@ class FlattenRowCoder(FastCoder):
     def __init__(self, field_coders):
         self._field_coders = field_coders
 
-    def _create_impl(self):
+    def get_impl(self):
         return coder_impl.FlattenRowCoderImpl([c.get_impl() for c in self._field_coders])
 
-    def is_deterministic(self):
-        return all(c.is_deterministic() for c in self._field_coders)
-
-    def to_type_hint(self):
-        return typehints.List
-
-    @Coder.register_urn(FLINK_SCALAR_FUNCTION_SCHEMA_CODER_URN, flink_fn_execution_pb2.Schema)
-    def _pickle_from_runner_api_parameter(schema_proto, unused_components, unused_context):
+    @staticmethod
+    def from_schema_proto(schema_proto):
         return FlattenRowCoder([from_proto(f.type) for f in schema_proto.fields])
 
     def __repr__(self):
@@ -121,46 +108,37 @@ class FlattenRowCoder(FastCoder):
         return hash(self._field_coders)
 
 
-class RowCoder(FlattenRowCoder):
+class FieldCoder(ABC):
+    def get_impl(self):
+        pass
+
+
+class RowCoder(FieldCoder):
     """
     Coder for Row.
     """
 
     def __init__(self, field_coders):
-        super(RowCoder, self).__init__(field_coders)
-
-    def _create_impl(self):
-        return coder_impl.RowCoderImpl([c.get_impl() for c in self._field_coders])
+        self._field_coders = field_coders
 
     def get_impl(self):
-        return self._create_impl()
-
-    def to_type_hint(self):
-        return Row
+        return coder_impl.RowCoderImpl([c.get_impl() for c in self._field_coders])
 
     def __repr__(self):
         return 'RowCoder[%s]' % ', '.join(str(c) for c in self._field_coders)
 
 
-class CollectionCoder(FastCoder):
+class CollectionCoder(FieldCoder):
     """
     Base coder for collection.
     """
+
     def __init__(self, elem_coder):
         self._elem_coder = elem_coder
 
-    def _create_impl(self):
-        raise NotImplementedError
-
-    def get_impl(self):
-        return self._create_impl()
-
     def is_deterministic(self):
         return self._elem_coder.is_deterministic()
 
-    def to_type_hint(self):
-        return []
-
     def __eq__(self, other):
         return (self.__class__ == other.__class__
                 and self._elem_coder == other._elem_coder)
@@ -184,11 +162,11 @@ class ArrayCoder(CollectionCoder):
         self._elem_coder = elem_coder
         super(ArrayCoder, self).__init__(elem_coder)
 
-    def _create_impl(self):
+    def get_impl(self):
         return coder_impl.ArrayCoderImpl(self._elem_coder.get_impl())
 
 
-class MapCoder(FastCoder):
+class MapCoder(FieldCoder):
     """
     Coder for Map.
     """
@@ -197,18 +175,12 @@ class MapCoder(FastCoder):
         self._key_coder = key_coder
         self._value_coder = value_coder
 
-    def _create_impl(self):
-        return coder_impl.MapCoderImpl(self._key_coder.get_impl(), self._value_coder.get_impl())
-
     def get_impl(self):
-        return self._create_impl()
+        return coder_impl.MapCoderImpl(self._key_coder.get_impl(), self._value_coder.get_impl())
 
     def is_deterministic(self):
         return self._key_coder.is_deterministic() and self._value_coder.is_deterministic()
 
-    def to_type_hint(self):
-        return {}
-
     def __repr__(self):
         return 'MapCoder[%s]' % ','.join([repr(self._key_coder), repr(self._value_coder)])
 
@@ -224,103 +196,70 @@ class MapCoder(FastCoder):
         return hash([self._key_coder, self._value_coder])
 
 
-class DeterministicCoder(FastCoder, ABC):
-    """
-    Base Coder for all deterministic Coders.
-    """
-
-    def is_deterministic(self):
-        return True
-
-    def get_impl(self):
-        return self._create_impl()
-
-
-class BigIntCoder(DeterministicCoder):
+class BigIntCoder(FieldCoder):
     """
     Coder for 8 bytes long.
     """
 
-    def _create_impl(self):
+    def get_impl(self):
         return coder_impl.BigIntCoderImpl()
 
-    def to_type_hint(self):
-        return int
 
-
-class TinyIntCoder(DeterministicCoder):
+class TinyIntCoder(FieldCoder):
     """
     Coder for Byte.
     """
 
-    def _create_impl(self):
+    def get_impl(self):
         return coder_impl.TinyIntCoderImpl()
 
-    def to_type_hint(self):
-        return int
-
 
-class BooleanCoder(DeterministicCoder):
+class BooleanCoder(FieldCoder):
     """
     Coder for Boolean.
     """
 
-    def _create_impl(self):
+    def get_impl(self):
         return coder_impl.BooleanCoderImpl()
 
-    def to_type_hint(self):
-        return bool
-
 
-class SmallIntCoder(DeterministicCoder):
+class SmallIntCoder(FieldCoder):
     """
     Coder for Short.
     """
 
-    def _create_impl(self):
+    def get_impl(self):
         return coder_impl.SmallIntCoderImpl()
 
-    def to_type_hint(self):
-        return int
 
-
-class IntCoder(DeterministicCoder):
+class IntCoder(FieldCoder):
     """
     Coder for 4 bytes int.
     """
 
-    def _create_impl(self):
+    def get_impl(self):
         return coder_impl.IntCoderImpl()
 
-    def to_type_hint(self):
-        return int
 
-
-class FloatCoder(DeterministicCoder):
+class FloatCoder(FieldCoder):
     """
     Coder for Float.
     """
 
-    def _create_impl(self):
+    def get_impl(self):
         return coder_impl.FloatCoderImpl()
 
-    def to_type_hint(self):
-        return float
-
 
-class DoubleCoder(DeterministicCoder):
+class DoubleCoder(FieldCoder):
     """
     Coder for Double.
     """
 
-    def _create_impl(self):
+    def get_impl(self):
         return coder_impl.DoubleCoderImpl()
 
-    def to_type_hint(self):
-        return float
-
 
-class DecimalCoder(DeterministicCoder):
+class DecimalCoder(FieldCoder):
     """
     Coder for Decimal.
     """
@@ -329,61 +268,47 @@ class DecimalCoder(DeterministicCoder):
         self.precision = precision
         self.scale = scale
 
-    def _create_impl(self):
+    def get_impl(self):
         return coder_impl.DecimalCoderImpl(self.precision, self.scale)
 
-    def to_type_hint(self):
-        return decimal.Decimal
-
 
-class BinaryCoder(DeterministicCoder):
+class BinaryCoder(FieldCoder):
     """
     Coder for Byte Array.
     """
 
-    def _create_impl(self):
+    def get_impl(self):
         return coder_impl.BinaryCoderImpl()
 
-    def to_type_hint(self):
-        return bytes
 
-
-class CharCoder(DeterministicCoder):
+class CharCoder(FieldCoder):
     """
     Coder for Character String.
     """
-    def _create_impl(self):
-        return coder_impl.CharCoderImpl()
 
-    def to_type_hint(self):
-        return str
+    def get_impl(self):
+        return coder_impl.CharCoderImpl()
 
 
-class DateCoder(DeterministicCoder):
+class DateCoder(FieldCoder):
     """
     Coder for Date
     """
 
-    def _create_impl(self):
+    def get_impl(self):
         return coder_impl.DateCoderImpl()
 
-    def to_type_hint(self):
-        return datetime.date
 
-
-class TimeCoder(DeterministicCoder):
+class TimeCoder(FieldCoder):
     """
     Coder for Time.
     """
 
-    def _create_impl(self):
+    def get_impl(self):
         return coder_impl.TimeCoderImpl()
 
-    def to_type_hint(self):
-        return datetime.time
 
-
-class TimestampCoder(DeterministicCoder):
+class TimestampCoder(FieldCoder):
     """
     Coder for Timestamp.
     """
@@ -391,14 +316,11 @@ class TimestampCoder(DeterministicCoder):
     def __init__(self, precision):
         self.precision = precision
 
-    def _create_impl(self):
+    def get_impl(self):
         return coder_impl.TimestampCoderImpl(self.precision)
 
-    def to_type_hint(self):
-        return datetime.datetime
-
 
-class LocalZonedTimestampCoder(DeterministicCoder):
+class LocalZonedTimestampCoder(FieldCoder):
     """
     Coder for LocalZonedTimestamp.
     """
@@ -407,108 +329,9 @@ class LocalZonedTimestampCoder(DeterministicCoder):
         self.precision = precision
         self.timezone = timezone
 
-    def _create_impl(self):
+    def get_impl(self):
         return coder_impl.LocalZonedTimestampCoderImpl(self.precision, self.timezone)
 
-    def to_type_hint(self):
-        return datetime.datetime
-
-
-class ArrowCoder(DeterministicCoder):
-    """
-    Coder for Arrow.
-    """
-    def __init__(self, schema, row_type, timezone):
-        self._schema = schema
-        self._row_type = row_type
-        self._timezone = timezone
-
-    def _create_impl(self):
-        return slow_coder_impl.ArrowCoderImpl(self._schema, self._row_type, self._timezone)
-
-    def to_type_hint(self):
-        import pandas as pd
-        return pd.Series
-
-    @Coder.register_urn(FLINK_SCALAR_FUNCTION_SCHEMA_ARROW_CODER_URN,
-                        flink_fn_execution_pb2.Schema)
-    def _pickle_from_runner_api_parameter(schema_proto, unused_components, unused_context):
-        def _to_arrow_schema(row_type):
-            return pa.schema([pa.field(n, to_arrow_type(t), t._nullable)
-                              for n, t in zip(row_type.field_names(), row_type.field_types())])
-
-        def _to_data_type(field_type):
-            if field_type.type_name == flink_fn_execution_pb2.Schema.TINYINT:
-                return TinyIntType(field_type.nullable)
-            elif field_type.type_name == flink_fn_execution_pb2.Schema.SMALLINT:
-                return SmallIntType(field_type.nullable)
-            elif field_type.type_name == flink_fn_execution_pb2.Schema.INT:
-                return IntType(field_type.nullable)
-            elif field_type.type_name == flink_fn_execution_pb2.Schema.BIGINT:
-                return BigIntType(field_type.nullable)
-            elif field_type.type_name == flink_fn_execution_pb2.Schema.BOOLEAN:
-                return BooleanType(field_type.nullable)
-            elif field_type.type_name == flink_fn_execution_pb2.Schema.FLOAT:
-                return FloatType(field_type.nullable)
-            elif field_type.type_name == flink_fn_execution_pb2.Schema.DOUBLE:
-                return DoubleType(field_type.nullable)
-            elif field_type.type_name == flink_fn_execution_pb2.Schema.VARCHAR:
-                return VarCharType(0x7fffffff, field_type.nullable)
-            elif field_type.type_name == flink_fn_execution_pb2.Schema.VARBINARY:
-                return VarBinaryType(0x7fffffff, field_type.nullable)
-            elif field_type.type_name == flink_fn_execution_pb2.Schema.DECIMAL:
-                return DecimalType(field_type.decimal_info.precision,
-                                   field_type.decimal_info.scale,
-                                   field_type.nullable)
-            elif field_type.type_name == flink_fn_execution_pb2.Schema.DATE:
-                return DateType(field_type.nullable)
-            elif field_type.type_name == flink_fn_execution_pb2.Schema.TIME:
-                return TimeType(field_type.time_info.precision, field_type.nullable)
-            elif field_type.type_name == \
-                    flink_fn_execution_pb2.Schema.LOCAL_ZONED_TIMESTAMP:
-                return LocalZonedTimestampType(field_type.local_zoned_timestamp_info.precision,
-                                               field_type.nullable)
-            elif field_type.type_name == flink_fn_execution_pb2.Schema.TIMESTAMP:
-                return TimestampType(field_type.timestamp_info.precision, field_type.nullable)
-            elif field_type.type_name == flink_fn_execution_pb2.Schema.ARRAY:
-                return ArrayType(_to_data_type(field_type.collection_element_type),
-                                 field_type.nullable)
-            elif field_type.type_name == flink_fn_execution_pb2.Schema.TypeName.ROW:
-                return RowType(
-                    [RowField(f.name, _to_data_type(f.type), f.description)
-                     for f in field_type.row_schema.fields], field_type.nullable)
-            else:
-                raise ValueError("field_type %s is not supported." % field_type)
-
-        def _to_row_type(row_schema):
-            return RowType([RowField(f.name, _to_data_type(f.type)) for f in row_schema.fields])
-
-        timezone = pytz.timezone(pipeline_options.view_as(DebugOptions).lookup_experiment(
-            "table.exec.timezone"))
-        row_type = _to_row_type(schema_proto)
-        return ArrowCoder(_to_arrow_schema(row_type), row_type, timezone)
-
-    def __repr__(self):
-        return 'ArrowCoder[%s]' % self._schema
-
-
-class PassThroughLengthPrefixCoder(LengthPrefixCoder):
-    """
-    Coder which doesn't prefix the length of the encoded object as the length prefix will be handled
-    by the wrapped value coder.
-    """
-    def __init__(self, value_coder):
-        super(PassThroughLengthPrefixCoder, self).__init__(value_coder)
-
-    def _create_impl(self):
-        return coder_impl.PassThroughLengthPrefixCoderImpl(self._value_coder.get_impl())
-
-    def __repr__(self):
-        return 'PassThroughLengthPrefixCoder[%s]' % self._value_coder
-
-
-Coder.register_structured_urn(
-    common_urns.coders.LENGTH_PREFIX.urn, PassThroughLengthPrefixCoder)
 
 type_name = flink_fn_execution_pb2.Schema
 _type_name_mappings = {
@@ -544,8 +367,7 @@ def from_proto(field_type):
     if field_type_name == type_name.TIMESTAMP:
         return TimestampCoder(field_type.timestamp_info.precision)
     if field_type_name == type_name.LOCAL_ZONED_TIMESTAMP:
-        timezone = pytz.timezone(pipeline_options.view_as(DebugOptions).lookup_experiment(
-            "table.exec.timezone"))
+        timezone = pytz.timezone(os.environ['table.exec.timezone'])
         return LocalZonedTimestampCoder(field_type.local_zoned_timestamp_info.precision, timezone)
     elif field_type_name == type_name.ARRAY:
         return ArrayCoder(from_proto(field_type.collection_element_type))
diff --git a/flink-python/pyflink/fn_execution/fast_coder_impl.pxd b/flink-python/pyflink/fn_execution/fast_coder_impl.pxd
index 2fc7266..3d53e53 100644
--- a/flink-python/pyflink/fn_execution/fast_coder_impl.pxd
+++ b/flink-python/pyflink/fn_execution/fast_coder_impl.pxd
@@ -19,54 +19,23 @@
 
 cimport libc.stdint
 
-from apache_beam.coders.coder_impl cimport StreamCoderImpl, OutputStream, InputStream
-
-# InputStreamAndFunctionWrapper wraps the user-defined function
-# and input_stream_wrapper in operations
-cdef class InputStreamAndFunctionWrapper:
-    # user-defined function
-    cdef readonly object func
-    cdef InputStreamWrapper input_stream_wrapper
-
-# InputStreamWrapper wraps input_stream and related infos used to decode data
-cdef class InputStreamWrapper:
-    cdef InputStream input_stream
-    cdef list input_field_coders
-    cdef TypeName*input_field_type
-    cdef CoderType*input_coder_type
-    cdef libc.stdint.int32_t input_field_count
-    cdef libc.stdint.int32_t input_leading_complete_bytes_num
-    cdef libc.stdint.int32_t input_remaining_bits_num
-    cdef size_t input_buffer_size
-
-cdef class PassThroughLengthPrefixCoderImpl(StreamCoderImpl):
-    cdef readonly StreamCoderImpl _value_coder
-
-cdef class FlattenRowCoderImpl(StreamCoderImpl):
-    # the input field coders and related args used to decode input_stream data
-    cdef list _input_field_coders
-    cdef TypeName*_input_field_type
-    cdef CoderType*_input_coder_type
-    cdef libc.stdint.int32_t _input_field_count
-    cdef libc.stdint.int32_t _input_leading_complete_bytes_num
-    cdef libc.stdint.int32_t _input_remaining_bits_num
-
-    # the output field coders and related args used to encode data to output_stream
-    cdef readonly list _output_field_coders
-    cdef TypeName*_output_field_type
-    cdef CoderType*_output_coder_type
-    cdef libc.stdint.int32_t _output_field_count
-    cdef libc.stdint.int32_t _output_leading_complete_bytes_num
-    cdef libc.stdint.int32_t _output_remaining_bits_num
+from pyflink.fn_execution.stream cimport InputStream, OutputStream
+
+cdef class BaseCoderImpl:
+    cpdef void encode(self, value, OutputStream output_stream)
+    cpdef decode(self, InputStream input_stream)
+
+cdef class FlattenRowCoderImpl(BaseCoderImpl):
+    cdef readonly list _field_coders
+    cdef TypeName*_field_type
+    cdef CoderType*_field_coder_type
+    cdef size_t _field_count
+    cdef size_t _leading_complete_bytes_num
+    cdef size_t _remaining_bits_num
 
     cdef bint*_null_mask
     cdef unsigned char*_null_byte_search_table
 
-    # the char pointer used to store encoded data of output_stream
-    cdef char*_output_data
-    cdef size_t _output_buffer_size
-    cdef size_t _output_pos
-
     # the tmp char pointer used to store encoded data of every row
     cdef char*_tmp_output_data
     cdef size_t _tmp_output_buffer_size
@@ -75,40 +44,24 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
     # the char pointer used to map the decoded data of input_stream
     cdef char*_input_data
     cdef size_t _input_pos
-    cdef size_t _input_buffer_size
 
     # used to store the result of Python user-defined function
     cdef list row
 
-    # the Python user-defined function
-    cdef object func
-
     # initial attribute
     cdef void _init_attribute(self)
 
-    # wrap input_stream
-    cdef InputStreamWrapper _wrap_input_stream(self, InputStream input_stream, size_t size)
-
-    cdef void _write_null_mask(self, value, libc.stdint.int32_t leading_complete_bytes_num,
-                               libc.stdint.int32_t remaining_bits_num)
-    cdef void _read_null_mask(self, bint*null_mask, libc.stdint.int32_t leading_complete_bytes_num,
-                              libc.stdint.int32_t remaining_bits_num)
-
-    cdef void _prepare_encode(self, InputStreamAndFunctionWrapper input_stream_and_function_wrapper,
-                              OutputStream out_stream)
-
-    cdef void _maybe_flush(self, OutputStream out_stream)
-    # Because output_buffer will be reallocated during encoding data, we need to remap output_buffer
-    # to the data pointer of output_stream
-    cdef void _map_output_data_to_output_stream(self, OutputStream out_stream)
-    cdef void _copy_to_output_buffer(self)
+    cdef void _write_null_mask(self, value, size_t leading_complete_bytes_num,
+                               size_t remaining_bits_num)
+    cdef void _read_null_mask(self, bint*null_mask, size_t leading_complete_bytes_num,
+                              size_t remaining_bits_num)
 
     # encode data to output_stream
-    cdef void _encode_one_row(self, value)
-    cdef void _encode_field(self, CoderType coder_type, TypeName field_type, BaseCoder field_coder,
-                          item)
+    cdef void _encode_one_row(self, value, OutputStream output_stream)
+    cdef void _encode_field(self, CoderType coder_type, TypeName field_type, FieldCoder field_coder,
+                            item)
     cdef void _encode_field_simple(self, TypeName field_type, item)
-    cdef void _encode_field_complex(self, TypeName field_type, BaseCoder field_coder, item)
+    cdef void _encode_field_complex(self, TypeName field_type, FieldCoder field_coder, item)
     cdef void _extend(self, size_t missing)
     cdef void _encode_byte(self, unsigned char val)
     cdef void _encode_smallint(self, libc.stdint.int16_t v)
@@ -119,10 +72,11 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
     cdef void _encode_bytes(self, char*b, size_t length)
 
     # decode data from input_stream
-    cdef void _decode_next_row(self)
-    cdef object _decode_field(self, CoderType coder_type, TypeName field_type, BaseCoder field_coder)
+    cdef void _decode_next_row(self, InputStream input_stream)
+    cdef object _decode_field(self, CoderType coder_type, TypeName field_type,
+                              FieldCoder field_coder)
     cdef object _decode_field_simple(self, TypeName field_type)
-    cdef object _decode_field_complex(self, TypeName field_type, BaseCoder field_coder)
+    cdef object _decode_field_complex(self, TypeName field_type, FieldCoder field_coder)
     cdef unsigned char _decode_byte(self) except? -1
     cdef libc.stdint.int16_t _decode_smallint(self) except? -1
     cdef libc.stdint.int32_t _decode_int(self) except? -1
@@ -132,7 +86,7 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
     cdef bytes _decode_bytes(self)
 
 cdef class TableFunctionRowCoderImpl(FlattenRowCoderImpl):
-    cdef void _encode_end_message(self)
+    cdef char* _end_message
 
 cdef enum CoderType:
     UNDEFINED = -1
@@ -159,59 +113,59 @@ cdef enum TypeName:
     MAP = 15
     LOCAL_ZONED_TIMESTAMP = 16
 
-cdef class BaseCoder:
+cdef class FieldCoder:
     cpdef CoderType coder_type(self)
     cpdef TypeName type_name(self)
 
-cdef class TinyIntCoderImpl(BaseCoder):
+cdef class TinyIntCoderImpl(FieldCoder):
     pass
 
-cdef class SmallIntCoderImpl(BaseCoder):
+cdef class SmallIntCoderImpl(FieldCoder):
     pass
 
-cdef class IntCoderImpl(BaseCoder):
+cdef class IntCoderImpl(FieldCoder):
     pass
 
-cdef class BigIntCoderImpl(BaseCoder):
+cdef class BigIntCoderImpl(FieldCoder):
     pass
 
-cdef class BooleanCoderImpl(BaseCoder):
+cdef class BooleanCoderImpl(FieldCoder):
     pass
 
-cdef class FloatCoderImpl(BaseCoder):
+cdef class FloatCoderImpl(FieldCoder):
     pass
 
-cdef class DoubleCoderImpl(BaseCoder):
+cdef class DoubleCoderImpl(FieldCoder):
     pass
 
-cdef class BinaryCoderImpl(BaseCoder):
+cdef class BinaryCoderImpl(FieldCoder):
     pass
 
-cdef class CharCoderImpl(BaseCoder):
+cdef class CharCoderImpl(FieldCoder):
     pass
 
-cdef class DateCoderImpl(BaseCoder):
+cdef class DateCoderImpl(FieldCoder):
     pass
 
-cdef class TimeCoderImpl(BaseCoder):
+cdef class TimeCoderImpl(FieldCoder):
     pass
 
-cdef class DecimalCoderImpl(BaseCoder):
+cdef class DecimalCoderImpl(FieldCoder):
     cdef readonly object context
     cdef readonly object scale_format
 
-cdef class TimestampCoderImpl(BaseCoder):
+cdef class TimestampCoderImpl(FieldCoder):
     cdef readonly bint is_compact
 
 cdef class LocalZonedTimestampCoderImpl(TimestampCoderImpl):
     cdef readonly object timezone
 
-cdef class ArrayCoderImpl(BaseCoder):
-    cdef readonly BaseCoder elem_coder
+cdef class ArrayCoderImpl(FieldCoder):
+    cdef readonly FieldCoder elem_coder
 
-cdef class MapCoderImpl(BaseCoder):
-    cdef readonly BaseCoder key_coder
-    cdef readonly BaseCoder value_coder
+cdef class MapCoderImpl(FieldCoder):
+    cdef readonly FieldCoder key_coder
+    cdef readonly FieldCoder value_coder
 
-cdef class RowCoderImpl(BaseCoder):
+cdef class RowCoderImpl(FieldCoder):
     cdef readonly list field_coders
diff --git a/flink-python/pyflink/fn_execution/fast_coder_impl.pyx b/flink-python/pyflink/fn_execution/fast_coder_impl.pyx
index eb189fd..38481bc 100644
--- a/flink-python/pyflink/fn_execution/fast_coder_impl.pyx
+++ b/flink-python/pyflink/fn_execution/fast_coder_impl.pyx
@@ -20,90 +20,62 @@
 # cython: profile=True
 # cython: boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True
 
-cimport libc.stdlib
+from libc.stdlib cimport free, malloc, realloc
+from libc.string cimport memcpy
 
 import datetime
 import decimal
 from pyflink.table import Row
 
-cdef class InputStreamAndFunctionWrapper:
-    def __cinit__(self, func, input_stream_wrapper):
-        self.func = func
-        self.input_stream_wrapper = input_stream_wrapper
+cdef class BaseCoderImpl:
+    cpdef void encode(self, value, OutputStream output_stream):
+        pass
 
-cdef class PassThroughLengthPrefixCoderImpl(StreamCoderImpl):
-    def __cinit__(self, value_coder):
-        self._value_coder = value_coder
-
-    cpdef encode_to_stream(self, value, OutputStream out_stream, bint nested):
-        self._value_coder.encode_to_stream(value, out_stream, nested)
-
-    cpdef decode_from_stream(self, InputStream in_stream, bint nested):
-        return self._value_coder.decode_from_stream(in_stream, nested)
-
-    cpdef get_estimated_size_and_observables(self, value, bint nested=False):
-        return 0, []
+    cpdef decode(self, InputStream input_stream):
+        pass
 
 cdef class TableFunctionRowCoderImpl(FlattenRowCoderImpl):
     def __init__(self, flatten_row_coder):
-        super(TableFunctionRowCoderImpl, self).__init__(flatten_row_coder._output_field_coders)
-
-    cpdef encode_to_stream(self, input_stream_and_function_wrapper, OutputStream out_stream,
-                           bint nested):
-        self._prepare_encode(input_stream_and_function_wrapper, out_stream)
-        while self._input_buffer_size > self._input_pos:
-            self._decode_next_row()
-            result = self.func(self.row)
-            if result:
-                for value in result:
-                    if self._output_field_count == 1:
-                        value = (value,)
-                    self._encode_one_row(value)
-                    self._maybe_flush(out_stream)
-            self._encode_end_message()
-
-        self._map_output_data_to_output_stream(out_stream)
-
-    # write 0x00 as end message
-    cdef void _encode_end_message(self):
-        if self._output_buffer_size < self._output_pos + 2:
-            self._extend(2)
-        self._output_data[self._output_pos] = 0x01
-        self._output_data[self._output_pos + 1] = 0x00
-        self._output_pos += 2
+        super(TableFunctionRowCoderImpl, self).__init__(flatten_row_coder._field_coders)
+        self._end_message = <char*> malloc(1)
+        self._end_message[0] = 0x00
+
+    cpdef void encode(self, iter_value, OutputStream output_stream):
+        if iter_value:
+            for value in iter_value:
+                if self._field_count == 1:
+                    value = (value,)
+                self._encode_one_row(value, output_stream)
+        # write 0x00 as end message
+        output_stream.write(self._end_message, 1)
 
-cdef class FlattenRowCoderImpl(StreamCoderImpl):
+    def __dealloc__(self):
+        if self._end_message:
+            free(self._end_message)
+
+cdef class FlattenRowCoderImpl(BaseCoderImpl):
     def __init__(self, field_coders):
-        self._output_field_coders = field_coders
-        self._output_field_count = len(self._output_field_coders)
-        self._output_field_type = <TypeName*> libc.stdlib.malloc(
-            self._output_field_count * sizeof(TypeName))
-        self._output_coder_type = <CoderType*> libc.stdlib.malloc(
-            self._output_field_count * sizeof(CoderType))
-        self._output_leading_complete_bytes_num = self._output_field_count // 8
-        self._output_remaining_bits_num = self._output_field_count % 8
+        self._field_coders = field_coders
+        self._field_count = len(self._field_coders)
+        self._field_type = <TypeName*> malloc(self._field_count * sizeof(TypeName))
+        self._field_coder_type = <CoderType*> malloc(
+            self._field_count * sizeof(CoderType))
+        self._leading_complete_bytes_num = self._field_count // 8
+        self._remaining_bits_num = self._field_count % 8
         self._tmp_output_buffer_size = 1024
         self._tmp_output_pos = 0
-        self._tmp_output_data = <char*> libc.stdlib.malloc(self._tmp_output_buffer_size)
-        self._null_byte_search_table = <unsigned char*> libc.stdlib.malloc(
-            8 * sizeof(unsigned char))
+        self._tmp_output_data = <char*> malloc(self._tmp_output_buffer_size)
+        self._null_byte_search_table = <unsigned char*> malloc(8 * sizeof(unsigned char))
+        self._null_mask = <bint*> malloc(self._field_count * sizeof(bint))
         self._init_attribute()
+        self.row = [None for _ in range(self._field_count)]
+
+    cpdef void encode(self, value, OutputStream output_stream):
+        self._encode_one_row(value, output_stream)
 
-    cpdef decode_from_stream(self, InputStream in_stream, bint nested):
-        cdef InputStreamWrapper input_stream_wrapper
-        input_stream_wrapper = self._wrap_input_stream(in_stream, in_stream.size())
-        return input_stream_wrapper
-
-    cpdef encode_to_stream(self, input_stream_and_function_wrapper, OutputStream out_stream,
-                           bint nested):
-        cdef list result
-        self._prepare_encode(input_stream_and_function_wrapper, out_stream)
-        while self._input_buffer_size > self._input_pos:
-            self._decode_next_row()
-            result = self.func(self.row)
-            self._encode_one_row(result)
-            self._maybe_flush(out_stream)
-        self._map_output_data_to_output_stream(out_stream)
+    cpdef decode(self, InputStream input_stream):
+        self._decode_next_row(input_stream)
+        return self.row
 
     cdef void _init_attribute(self):
         self._null_byte_search_table[0] = 0x80
@@ -114,46 +86,28 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
         self._null_byte_search_table[5] = 0x04
         self._null_byte_search_table[6] = 0x02
         self._null_byte_search_table[7] = 0x01
-        for i in range(self._output_field_count):
-            self._output_field_type[i] = self._output_field_coders[i].type_name()
-            self._output_coder_type[i] = self._output_field_coders[i].coder_type()
-
-    cdef InputStreamWrapper _wrap_input_stream(self, InputStream input_stream, size_t size):
-        # wrappers the input field coders and input_stream together
-        # so that it can be transposed to operations
-        cdef InputStreamWrapper input_stream_wrapper
-        input_stream_wrapper = InputStreamWrapper()
-        input_stream_wrapper.input_stream = input_stream
-        input_stream_wrapper.input_field_coders = self._output_field_coders
-        input_stream_wrapper.input_remaining_bits_num = self._output_remaining_bits_num
-        input_stream_wrapper.input_leading_complete_bytes_num = \
-            self._output_leading_complete_bytes_num
-        input_stream_wrapper.input_field_count = self._output_field_count
-        input_stream_wrapper.input_field_type = self._output_field_type
-        input_stream_wrapper.input_coder_type = self._output_coder_type
-        input_stream_wrapper.input_stream.pos = size
-        input_stream_wrapper.input_buffer_size = size
-        return input_stream_wrapper
-
-    cdef void _encode_one_row(self, value):
-        cdef libc.stdint.int32_t i
-        self._write_null_mask(value, self._output_leading_complete_bytes_num,
-                              self._output_remaining_bits_num)
-        for i in range(self._output_field_count):
+        for i in range(self._field_count):
+            self._field_type[i] = self._field_coders[i].type_name()
+            self._field_coder_type[i] = self._field_coders[i].coder_type()
+
+    cdef void _encode_one_row(self, value, OutputStream output_stream):
+        cdef size_t i
+        self._write_null_mask(value, self._leading_complete_bytes_num, self._remaining_bits_num)
+        for i in range(self._field_count):
             item = value[i]
             if item is not None:
-                if self._output_coder_type[i] == SIMPLE:
-                    self._encode_field_simple(self._output_field_type[i], item)
+                if self._field_coder_type[i] == SIMPLE:
+                    self._encode_field_simple(self._field_type[i], item)
                 else:
-                    self._encode_field_complex(self._output_field_type[i],
-                                               self._output_field_coders[i], item)
+                    self._encode_field_complex(self._field_type[i], self._field_coders[i], item)
 
-        self._copy_to_output_buffer()
+        output_stream.write(self._tmp_output_data, self._tmp_output_pos)
+        self._tmp_output_pos = 0
 
     cdef void _read_null_mask(self, bint*null_mask,
-                              libc.stdint.int32_t input_leading_complete_bytes_num,
-                              libc.stdint.int32_t input_remaining_bits_num):
-        cdef libc.stdint.int32_t field_pos, i
+                              size_t input_leading_complete_bytes_num,
+                              size_t input_remaining_bits_num):
+        cdef size_t field_pos, i
         cdef unsigned char b
         field_pos = 0
         for _ in range(input_leading_complete_bytes_num):
@@ -170,25 +124,25 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
                 null_mask[field_pos] = (b & self._null_byte_search_table[i]) > 0
                 field_pos += 1
 
-    cdef void _decode_next_row(self):
-        cdef libc.stdint.int32_t i
-        # skip prefix variable int length
-        while self._input_data[self._input_pos] & 0x80:
-            self._input_pos += 1
-        self._input_pos += 1
-        self._read_null_mask(self._null_mask, self._input_leading_complete_bytes_num,
-                             self._input_remaining_bits_num)
-        for i in range(self._input_field_count):
+    cdef void _decode_next_row(self, InputStream input_stream):
+        cdef size_t i
+        cdef size_t length
+        length = input_stream.read(&self._input_data)
+        self._input_pos = 0
+        self._read_null_mask(self._null_mask, self._leading_complete_bytes_num,
+                             self._remaining_bits_num)
+        for i in range(self._field_count):
             if self._null_mask[i]:
                 self.row[i] = None
             else:
-                if self._input_coder_type[i] == SIMPLE:
-                    self.row[i] = self._decode_field_simple(self._input_field_type[i])
+                if self._field_coder_type[i] == SIMPLE:
+                    self.row[i] = self._decode_field_simple(self._field_type[i])
                 else:
-                    self.row[i] = self._decode_field_complex(self._input_field_type[i],
-                                                             self._input_field_coders[i])
+                    self.row[i] = self._decode_field_complex(self._field_type[i],
+                                                             self._field_coders[i])
 
-    cdef object _decode_field(self, CoderType coder_type, TypeName field_type, BaseCoder field_coder):
+    cdef object _decode_field(self, CoderType coder_type, TypeName field_type,
+                              FieldCoder field_coder):
         if coder_type == SIMPLE:
             return self._decode_field_simple(field_type)
         else:
@@ -240,12 +194,12 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
             minutes %= 60
             return datetime.time(hours, minutes, seconds, milliseconds * 1000)
 
-    cdef object _decode_field_complex(self, TypeName field_type, BaseCoder field_coder):
+    cdef object _decode_field_complex(self, TypeName field_type, FieldCoder field_coder):
         cdef libc.stdint.int32_t nanoseconds, microseconds, seconds, length
         cdef libc.stdint.int32_t i, row_field_count, leading_complete_bytes_num, remaining_bits_num
         cdef libc.stdint.int64_t milliseconds
         cdef bint*null_mask
-        cdef BaseCoder value_coder, key_coder
+        cdef FieldCoder value_coder, key_coder
         cdef TypeName value_type, key_type
         cdef CoderType value_coder_type, key_coder_type
         cdef list row_field_coders
@@ -287,8 +241,9 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
             value_coder = (<ArrayCoderImpl> field_coder).elem_coder
             value_type = value_coder.type_name()
             value_coder_type = value_coder.coder_type()
-            return [self._decode_field(value_coder_type, value_type, value_coder) if self._decode_byte()
-                    else None for _ in range(length)]
+            return [
+                self._decode_field(value_coder_type, value_type, value_coder) if self._decode_byte()
+                else None for _ in range(length)]
         elif field_type == MAP:
             # Map
             key_coder = (<MapCoderImpl> field_coder).key_coder
@@ -310,7 +265,7 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
             # Row
             row_field_coders = (<RowCoderImpl> field_coder).field_coders
             row_field_count = len(row_field_coders)
-            null_mask = <bint*> libc.stdlib.malloc(row_field_count * sizeof(bint))
+            null_mask = <bint*> malloc(row_field_count * sizeof(bint))
             leading_complete_bytes_num = row_field_count // 8
             remaining_bits_num = row_field_count % 8
             self._read_null_mask(null_mask, leading_complete_bytes_num, remaining_bits_num)
@@ -320,7 +275,7 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
                             row_field_coders[i].type_name(),
                             row_field_coders[i])
                         for i in range(row_field_count)])
-            libc.stdlib.free(null_mask)
+            free(null_mask)
             return row
 
     cdef unsigned char _decode_byte(self) except? -1:
@@ -365,36 +320,8 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
         self._input_pos += size
         return self._input_data[self._input_pos - size: self._input_pos]
 
-    cdef void _prepare_encode(self, InputStreamAndFunctionWrapper input_stream_and_function_wrapper,
-                              OutputStream out_stream):
-        cdef InputStreamWrapper input_stream_wrapper
-        # get the data pointer of output_stream
-        self._output_data = out_stream.data
-        self._output_pos = out_stream.pos
-        self._output_buffer_size = out_stream.buffer_size
-        self._tmp_output_pos = 0
-
-        input_stream_wrapper = input_stream_and_function_wrapper.input_stream_wrapper
-        # get the data pointer of input_stream
-        self._input_data = input_stream_wrapper.input_stream.allc
-        self._input_buffer_size = input_stream_wrapper.input_buffer_size
-
-        # get the infos of input coder which will be used to decode data from input_stream
-        self._input_field_count = input_stream_wrapper.input_field_count
-        self._input_leading_complete_bytes_num = input_stream_wrapper.input_leading_complete_bytes_num
-        self._input_remaining_bits_num = input_stream_wrapper.input_remaining_bits_num
-        self._input_field_type = input_stream_wrapper.input_field_type
-        self._input_coder_type = input_stream_wrapper.input_coder_type
-        self._input_field_coders = input_stream_wrapper.input_field_coders
-        self._null_mask = <bint*> libc.stdlib.malloc(self._input_field_count * sizeof(bint))
-        self._input_pos = 0
-
-        # initial the result row and get the Python user-defined function
-        self.row = [None for _ in range(self._input_field_count)]
-        self.func = input_stream_and_function_wrapper.func
-
-    cdef void _encode_field(self, CoderType coder_type, TypeName field_type, BaseCoder field_coder,
-                          item):
+    cdef void _encode_field(self, CoderType coder_type, TypeName field_type, FieldCoder field_coder,
+                            item):
         if coder_type == SIMPLE:
             self._encode_field_simple(field_type, item)
         else:
@@ -445,14 +372,14 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
             milliseconds = hour * 3600000 + minute * 60000 + seconds * 1000 + microsecond // 1000
             self._encode_int(milliseconds)
 
-    cdef void _encode_field_complex(self, TypeName field_type, BaseCoder field_coder, item):
+    cdef void _encode_field_complex(self, TypeName field_type, FieldCoder field_coder, item):
         cdef libc.stdint.int32_t nanoseconds, microseconds_of_second, length, row_field_count
         cdef libc.stdint.int32_t leading_complete_bytes_num, remaining_bits_num
         cdef libc.stdint.int64_t timestamp_milliseconds, timestamp_seconds
-        cdef BaseCoder value_coder, key_coder
+        cdef FieldCoder value_coder, key_coder
         cdef TypeName value_type, key_type
         cdef CoderType value_coder_type, key_coder_type
-        cdef BaseCoder row_field_coder
+        cdef FieldCoder row_field_coder
         cdef list row_field_coders, row_value
 
         if field_type == DECIMAL:
@@ -522,54 +449,12 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
                 row_field_coder = row_field_coders[i]
                 if field_item is not None:
                     self._encode_field(row_field_coder.coder_type(), row_field_coder.type_name(),
-                                     row_field_coder, field_item)
-
-    cdef void _copy_to_output_buffer(self):
-        cdef size_t size
-        cdef size_t i
-        cdef bint is_realloc
-        cdef char bits
-        # the length of the variable prefix length will be less than 9 bytes
-        if self._output_buffer_size < self._output_pos + self._tmp_output_pos + 9:
-            self._output_buffer_size += self._tmp_output_buffer_size + 9
-            self._output_data = <char*> libc.stdlib.realloc(self._output_data,
-                                                            self._output_buffer_size)
-        size = self._tmp_output_pos
-        # write variable prefix length
-        while size:
-            bits = size & 0x7F
-            size >>= 7
-            if size:
-                bits |= 0x80
-            self._output_data[self._output_pos] = bits
-            self._output_pos += 1
-        if self._tmp_output_pos < 8:
-            # This is faster than memcpy when the string is short.
-            for i in range(self._tmp_output_pos):
-                self._output_data[self._output_pos + i] = self._tmp_output_data[i]
-        else:
-            libc.string.memcpy(self._output_data + self._output_pos, self._tmp_output_data,
-                               self._tmp_output_pos)
-        self._output_pos += self._tmp_output_pos
-        self._tmp_output_pos = 0
-
-    cdef void _maybe_flush(self, OutputStream out_stream):
-        # Currently, it will trigger flushing when the size of buffer reach to 10_000_000
-        if self._output_pos > 10_000_000:
-            self._map_output_data_to_output_stream(out_stream)
-            out_stream.flush()
-            self._output_pos = 0
-
-    cdef void _map_output_data_to_output_stream(self, OutputStream out_stream):
-        out_stream.data = self._output_data
-        out_stream.pos = self._output_pos
-        out_stream.buffer_size = self._output_buffer_size
+                                       row_field_coder, field_item)
 
     cdef void _extend(self, size_t missing):
         while self._tmp_output_buffer_size < self._tmp_output_pos + missing:
             self._tmp_output_buffer_size *= 2
-        self._tmp_output_data = <char*> libc.stdlib.realloc(self._tmp_output_data,
-                                                            self._tmp_output_buffer_size)
+        self._tmp_output_data = <char*> realloc(self._tmp_output_data, self._tmp_output_buffer_size)
 
     cdef void _encode_byte(self, unsigned char val):
         if self._tmp_output_buffer_size < self._tmp_output_pos + 1:
@@ -621,12 +506,12 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
             for i in range(length):
                 self._tmp_output_data[self._tmp_output_pos + i] = b[i]
         else:
-            libc.string.memcpy(self._tmp_output_data + self._tmp_output_pos, b, length)
+            memcpy(self._tmp_output_data + self._tmp_output_pos, b, length)
         self._tmp_output_pos += length
 
-    cdef void _write_null_mask(self, value, libc.stdint.int32_t leading_complete_bytes_num,
-                               libc.stdint.int32_t remaining_bits_num):
-        cdef libc.stdint.int32_t field_pos, index
+    cdef void _write_null_mask(self, value, size_t leading_complete_bytes_num,
+                               size_t remaining_bits_num):
+        cdef size_t field_pos, index
         cdef unsigned char*null_byte_search_table
         cdef unsigned char b, i
         field_pos = 0
@@ -647,95 +532,95 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
             self._encode_byte(b)
 
     def __dealloc__(self):
-        if self.null_mask:
-            libc.stdlib.free(self._null_mask)
-        if self.null_byte_search_table:
-            libc.stdlib.free(self._null_byte_search_table)
+        if self._null_mask:
+            free(self._null_mask)
+        if self._null_byte_search_table:
+            free(self._null_byte_search_table)
         if self._tmp_output_data:
-            libc.stdlib.free(self._tmp_output_data)
-        if self._output_field_type:
-            libc.stdlib.free(self._output_field_type)
-        if self._output_coder_type:
-            libc.stdlib.free(self._output_coder_type)
+            free(self._tmp_output_data)
+        if self._field_type:
+            free(self._field_type)
+        if self._field_coder_type:
+            free(self._field_coder_type)
 
-cdef class BaseCoder:
+cdef class FieldCoder:
     cpdef CoderType coder_type(self):
         return UNDEFINED
 
     cpdef TypeName type_name(self):
         return NONE
 
-cdef class TinyIntCoderImpl(BaseCoder):
+cdef class TinyIntCoderImpl(FieldCoder):
     cpdef CoderType coder_type(self):
         return SIMPLE
 
     cpdef TypeName type_name(self):
         return TINYINT
 
-cdef class SmallIntCoderImpl(BaseCoder):
+cdef class SmallIntCoderImpl(FieldCoder):
     cpdef CoderType coder_type(self):
         return SIMPLE
     cpdef TypeName type_name(self):
         return SMALLINT
 
-cdef class IntCoderImpl(BaseCoder):
+cdef class IntCoderImpl(FieldCoder):
     cpdef CoderType coder_type(self):
         return SIMPLE
 
     cpdef TypeName type_name(self):
         return INT
 
-cdef class BigIntCoderImpl(BaseCoder):
+cdef class BigIntCoderImpl(FieldCoder):
     cpdef CoderType coder_type(self):
         return SIMPLE
     cpdef TypeName type_name(self):
         return BIGINT
 
-cdef class BooleanCoderImpl(BaseCoder):
+cdef class BooleanCoderImpl(FieldCoder):
     cpdef CoderType coder_type(self):
         return SIMPLE
     cpdef TypeName type_name(self):
         return BOOLEAN
 
-cdef class FloatCoderImpl(BaseCoder):
+cdef class FloatCoderImpl(FieldCoder):
     cpdef CoderType coder_type(self):
         return SIMPLE
     cpdef TypeName type_name(self):
         return FLOAT
 
-cdef class DoubleCoderImpl(BaseCoder):
+cdef class DoubleCoderImpl(FieldCoder):
     cpdef CoderType coder_type(self):
         return SIMPLE
     cpdef TypeName type_name(self):
         return DOUBLE
 
-cdef class BinaryCoderImpl(BaseCoder):
+cdef class BinaryCoderImpl(FieldCoder):
     cpdef CoderType coder_type(self):
         return SIMPLE
     cpdef TypeName type_name(self):
         return BINARY
 
-cdef class CharCoderImpl(BaseCoder):
+cdef class CharCoderImpl(FieldCoder):
     cpdef CoderType coder_type(self):
         return SIMPLE
     cpdef TypeName type_name(self):
         return CHAR
 
-cdef class DateCoderImpl(BaseCoder):
+cdef class DateCoderImpl(FieldCoder):
     cpdef CoderType coder_type(self):
         return SIMPLE
 
     cpdef TypeName type_name(self):
         return DATE
 
-cdef class TimeCoderImpl(BaseCoder):
+cdef class TimeCoderImpl(FieldCoder):
     cpdef CoderType coder_type(self):
         return SIMPLE
 
     cpdef TypeName type_name(self):
         return TIME
 
-cdef class DecimalCoderImpl(BaseCoder):
+cdef class DecimalCoderImpl(FieldCoder):
     def __cinit__(self, precision, scale):
         self.context = decimal.Context(prec=precision)
         self.scale_format = decimal.Decimal(10) ** -scale
@@ -746,7 +631,7 @@ cdef class DecimalCoderImpl(BaseCoder):
     cpdef TypeName type_name(self):
         return DECIMAL
 
-cdef class TimestampCoderImpl(BaseCoder):
+cdef class TimestampCoderImpl(FieldCoder):
     def __init__(self, precision):
         self.is_compact = precision <= 3
 
@@ -767,7 +652,7 @@ cdef class LocalZonedTimestampCoderImpl(TimestampCoderImpl):
     cpdef TypeName type_name(self):
         return LOCAL_ZONED_TIMESTAMP
 
-cdef class ArrayCoderImpl(BaseCoder):
+cdef class ArrayCoderImpl(FieldCoder):
     def __cinit__(self, elem_coder):
         self.elem_coder = elem_coder
 
@@ -777,7 +662,7 @@ cdef class ArrayCoderImpl(BaseCoder):
     cpdef TypeName type_name(self):
         return ARRAY
 
-cdef class MapCoderImpl(BaseCoder):
+cdef class MapCoderImpl(FieldCoder):
     def __cinit__(self, key_coder, value_coder):
         self.key_coder = key_coder
         self.value_coder = value_coder
@@ -788,7 +673,7 @@ cdef class MapCoderImpl(BaseCoder):
     cpdef TypeName type_name(self):
         return MAP
 
-cdef class RowCoderImpl(BaseCoder):
+cdef class RowCoderImpl(FieldCoder):
     def __cinit__(self, field_coders):
         self.field_coders = field_coders
 
@@ -796,4 +681,4 @@ cdef class RowCoderImpl(BaseCoder):
         return COMPLEX
 
     cpdef TypeName type_name(self):
-        return ROW
\ No newline at end of file
+        return ROW
diff --git a/flink-python/pyflink/fn_execution/fast_operations.pxd b/flink-python/pyflink/fn_execution/fast_operations.pxd
index e551827..ba0e7fe 100644
--- a/flink-python/pyflink/fn_execution/fast_operations.pxd
+++ b/flink-python/pyflink/fn_execution/fast_operations.pxd
@@ -19,11 +19,14 @@
 cimport libc.stdint
 
 from apache_beam.runners.worker.operations cimport Operation
-from apache_beam.coders.coder_impl cimport StreamCoderImpl, CoderImpl, OutputStream, InputStream
+from apache_beam.coders.coder_impl cimport StreamCoderImpl
+
+from pyflink.fn_execution.fast_coder_impl cimport BaseCoderImpl
 
 cdef class StatelessFunctionOperation(Operation):
     cdef Operation consumer
     cdef StreamCoderImpl _value_coder_impl
+    cdef BaseCoderImpl _output_coder
     cdef dict variable_dict
     cdef list user_defined_funcs
     cdef libc.stdint.int32_t _func_num
diff --git a/flink-python/pyflink/fn_execution/fast_operations.pyx b/flink-python/pyflink/fn_execution/fast_operations.pyx
index b7a0a34..25bbbaa 100644
--- a/flink-python/pyflink/fn_execution/fast_operations.pyx
+++ b/flink-python/pyflink/fn_execution/fast_operations.pyx
@@ -25,8 +25,9 @@ import cloudpickle
 from apache_beam.runners.worker import bundle_processor
 from apache_beam.runners.worker import operation_specs
 from apache_beam.utils.windowed_value cimport WindowedValue
-from pyflink.fn_execution.fast_coder_impl cimport InputStreamAndFunctionWrapper
 
+from pyflink.fn_execution.beam.beam_stream cimport BeamInputStream, BeamOutputStream
+from pyflink.fn_execution.beam.beam_coder_impl cimport InputStreamWrapper
 from pyflink.fn_execution import flink_fn_execution_pb2
 from pyflink.metrics.metricbase import GenericMetricGroup
 from pyflink.serializers import PickleSerializer
@@ -45,13 +46,13 @@ cdef class StatelessFunctionOperation(Operation):
     def __init__(self, name, spec, counter_factory, sampler, consumers):
         super(StatelessFunctionOperation, self).__init__(name, spec, counter_factory, sampler)
         self.consumer = consumers['output'][0]
-        self._value_coder_impl = self.consumer.windowed_coder.wrapped_value_coder.get_impl()
-        value_coder = self._value_coder_impl._value_coder
-        from pyflink.fn_execution.coder_impl import ArrowCoderImpl
-        if isinstance(value_coder, ArrowCoderImpl):
+        self._value_coder_impl = self.consumer.windowed_coder.wrapped_value_coder.get_impl()._value_coder
+        from pyflink.fn_execution.beam.beam_slow_coder_impl import ArrowCoderImpl
+        if isinstance(self._value_coder_impl, ArrowCoderImpl):
             self._is_python_coder = True
         else:
             self._is_python_coder = False
+            self._output_coder = self._value_coder_impl._value_coder
 
         self.variable_dict = {}
         self.user_defined_funcs = []
@@ -80,15 +81,25 @@ cdef class StatelessFunctionOperation(Operation):
                 user_defined_func.close()
 
     cpdef process(self, WindowedValue o):
-        cdef InputStreamAndFunctionWrapper wrapper
+        cdef InputStreamWrapper input_stream_wrapper
+        cdef BeamInputStream input_stream
+        cdef BaseCoderImpl input_coder
+        cdef BeamOutputStream output_stream
         with self.scoped_process_state:
-            output_stream = self.consumer.output_stream
             if self._is_python_coder:
-                self._value_coder_impl.encode_to_stream(self.func(o.value), output_stream, True)
+                self._value_coder_impl.encode_to_stream(
+                    self.func(o.value), self.consumer.output_stream, True)
+                self.consumer.output_stream.maybe_flush()
             else:
-                wrapper = InputStreamAndFunctionWrapper(self.func, o.value)
-                self._value_coder_impl.encode_to_stream(wrapper, output_stream, True)
-            output_stream.maybe_flush()
+                input_stream_wrapper = o.value
+                input_stream = input_stream_wrapper._input_stream
+                input_coder = input_stream_wrapper._value_coder
+                output_stream = BeamOutputStream(self.consumer.output_stream)
+                while input_stream.available():
+                    input_data = input_coder.decode(input_stream)
+                    result = self.func(input_data)
+                    self._output_coder.encode(result, output_stream)
+                output_stream.flush()
 
     def progress_metrics(self):
         metrics = super(StatelessFunctionOperation, self).progress_metrics()
diff --git a/flink-python/pyflink/fn_execution/sdk_worker_main.py b/flink-python/pyflink/fn_execution/stream.pxd
similarity index 56%
copy from flink-python/pyflink/fn_execution/sdk_worker_main.py
copy to flink-python/pyflink/fn_execution/stream.pxd
index 5e750af..e2462ff 100644
--- a/flink-python/pyflink/fn_execution/sdk_worker_main.py
+++ b/flink-python/pyflink/fn_execution/stream.pxd
@@ -15,27 +15,12 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-import os
-import sys
+# cython: language_level = 3
 
-# force to register the operations to SDK Harness
-from apache_beam.options.pipeline_options import PipelineOptions
+cdef class InputStream:
+    cdef size_t read(self, char**data)
+    cdef size_t available(self)
 
-try:
-    import pyflink.fn_execution.fast_operations
-except ImportError:
-    import pyflink.fn_execution.operations
-
-# force to register the coders to SDK Harness
-import pyflink.fn_execution.coders # noqa # pylint: disable=unused-import
-
-import apache_beam.runners.worker.sdk_worker_main
-
-if 'PIPELINE_OPTIONS' in os.environ:
-    pipeline_options = apache_beam.runners.worker.sdk_worker_main._parse_pipeline_options(
-        os.environ['PIPELINE_OPTIONS'])
-else:
-    pipeline_options = PipelineOptions.from_dictionary({})
-
-if __name__ == '__main__':
-    apache_beam.runners.worker.sdk_worker_main.main(sys.argv)
+cdef class OutputStream:
+    cdef void write(self, char*data, size_t length)
+    cpdef void flush(self)
diff --git a/flink-python/pyflink/fn_execution/sdk_worker_main.py b/flink-python/pyflink/fn_execution/stream.pyx
similarity index 56%
rename from flink-python/pyflink/fn_execution/sdk_worker_main.py
rename to flink-python/pyflink/fn_execution/stream.pyx
index 5e750af..895c7c6 100644
--- a/flink-python/pyflink/fn_execution/sdk_worker_main.py
+++ b/flink-python/pyflink/fn_execution/stream.pyx
@@ -15,27 +15,16 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-import os
-import sys
+# cython: language_level = 3
 
-# force to register the operations to SDK Harness
-from apache_beam.options.pipeline_options import PipelineOptions
+cdef class InputStream:
+    cdef size_t read(self, char** data):
+        pass
+    cdef size_t available(self):
+        pass
 
-try:
-    import pyflink.fn_execution.fast_operations
-except ImportError:
-    import pyflink.fn_execution.operations
-
-# force to register the coders to SDK Harness
-import pyflink.fn_execution.coders # noqa # pylint: disable=unused-import
-
-import apache_beam.runners.worker.sdk_worker_main
-
-if 'PIPELINE_OPTIONS' in os.environ:
-    pipeline_options = apache_beam.runners.worker.sdk_worker_main._parse_pipeline_options(
-        os.environ['PIPELINE_OPTIONS'])
-else:
-    pipeline_options = PipelineOptions.from_dictionary({})
-
-if __name__ == '__main__':
-    apache_beam.runners.worker.sdk_worker_main.main(sys.argv)
+cdef class OutputStream:
+    cdef void write(self, char*data, size_t length):
+        pass
+    cpdef void flush(self):
+        pass
diff --git a/flink-python/pyflink/fn_execution/tests/test_coders.py b/flink-python/pyflink/fn_execution/tests/test_coders.py
index 3134e83..eb6fba8 100644
--- a/flink-python/pyflink/fn_execution/tests/test_coders.py
+++ b/flink-python/pyflink/fn_execution/tests/test_coders.py
@@ -38,12 +38,13 @@ except ImportError:
 class CodersTest(unittest.TestCase):
 
     def check_coder(self, coder, *values):
+        coder_impl = coder.get_impl()
         for v in values:
             if isinstance(v, float):
                 from pyflink.table.tests.test_udf import float_equal
-                assert float_equal(v, coder.decode(coder.encode(v)), 1e-6)
+                assert float_equal(v, coder_impl.decode(coder_impl.encode(v)), 1e-6)
             else:
-                self.assertEqual(v, coder.decode(coder.encode(v)))
+                self.assertEqual(v, coder_impl.decode(coder_impl.encode(v)))
 
     # decide whether two floats are equal
     @staticmethod
@@ -137,7 +138,7 @@ class CodersTest(unittest.TestCase):
     def test_flatten_row_coder(self):
         field_coder = BigIntCoder()
         field_count = 10
-        coder = FlattenRowCoder([field_coder for _ in range(field_count)])
+        coder = FlattenRowCoder([field_coder for _ in range(field_count)]).get_impl()
         v = [[None if i % 2 == 0 else i for i in range(field_count)]]
         generator_result = coder.decode(coder.encode(v))
         result = []
diff --git a/flink-python/pyflink/fn_execution/tests/test_fast_coders.py b/flink-python/pyflink/fn_execution/tests/test_fast_coders.py
index 38ea3be..a3491ea 100644
--- a/flink-python/pyflink/fn_execution/tests/test_fast_coders.py
+++ b/flink-python/pyflink/fn_execution/tests/test_fast_coders.py
@@ -20,10 +20,9 @@
 import logging
 import unittest
 
-from pyflink.fn_execution import coder_impl
-
 try:
     from pyflink.fn_execution import fast_coder_impl
+    from pyflink.fn_execution.beam import beam_slow_coder_impl as coder_impl
 
     have_cython = True
 except ImportError:
@@ -35,18 +34,19 @@ class CodersTest(unittest.TestCase):
 
     def check_cython_coder(self, python_field_coders, cython_field_coders, data):
         from apache_beam.coders.coder_impl import create_InputStream, create_OutputStream
-        from pyflink.fn_execution.fast_coder_impl import InputStreamAndFunctionWrapper
+        from pyflink.fn_execution.beam.beam_stream import BeamInputStream, BeamOutputStream
         py_flatten_row_coder = coder_impl.FlattenRowCoderImpl(python_field_coders)
         internal = py_flatten_row_coder.encode(data)
-        input_stream = create_InputStream(internal)
-        output_stream = create_OutputStream()
+        beam_input_stream = create_InputStream(internal)
+        input_stream = BeamInputStream(beam_input_stream, beam_input_stream.size())
+        beam_output_stream = create_OutputStream()
         cy_flatten_row_coder = fast_coder_impl.FlattenRowCoderImpl(cython_field_coders)
-        value = cy_flatten_row_coder.decode_from_stream(input_stream, False)
-        wrapper_func_input_element = InputStreamAndFunctionWrapper(
-            lambda v: [v[i] for i in range(len(v))], value)
-        cy_flatten_row_coder.encode_to_stream(wrapper_func_input_element, output_stream, False)
+        value = cy_flatten_row_coder.decode(input_stream)
+        output_stream = BeamOutputStream(beam_output_stream)
+        cy_flatten_row_coder.encode(value, output_stream)
+        output_stream.flush()
         generator_result = py_flatten_row_coder.decode_from_stream(create_InputStream(
-            output_stream.get()), False)
+            beam_output_stream.get()), False)
         result = []
         for item in generator_result:
             result.append(item)
diff --git a/flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py b/flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py
index 9b8e785..af80ed4 100644
--- a/flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py
+++ b/flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py
@@ -126,7 +126,10 @@ class PythonBootTests(PyFlinkTestCase):
         fn_execution_dir = os.path.join(pyflink_dir, "fn_execution")
         os.mkdir(fn_execution_dir)
         open(os.path.join(fn_execution_dir, "__init__.py"), 'a').close()
-        with open(os.path.join(fn_execution_dir, "boot.py"), "w") as f:
+        beam_dir = os.path.join(fn_execution_dir, "beam")
+        os.mkdir(beam_dir)
+        open(os.path.join(beam_dir, "__init__.py"), 'a').close()
+        with open(os.path.join(beam_dir, "beam_boot.py"), "w") as f:
             f.write("import os\nwith open(r'%s', 'w') as f:\n    f.write(os.getcwd())" %
                     output_file)
 
diff --git a/flink-python/setup.py b/flink-python/setup.py
index 22ca08a..fc78723 100644
--- a/flink-python/setup.py
+++ b/flink-python/setup.py
@@ -68,7 +68,19 @@ else:
             Extension(
                 name="pyflink.fn_execution.fast_operations",
                 sources=["pyflink/fn_execution/fast_operations.pyx"],
-                include_dirs=["pyflink/fn_execution/"])
+                include_dirs=["pyflink/fn_execution/"]),
+            Extension(
+                name="pyflink.fn_execution.stream",
+                sources=["pyflink/fn_execution/stream.pyx"],
+                include_dirs=["pyflink/fn_execution/"]),
+            Extension(
+                name="pyflink.fn_execution.beam.beam_stream",
+                sources=["pyflink/fn_execution/beam/beam_stream.pyx"],
+                include_dirs=["pyflink/fn_execution/beam"]),
+            Extension(
+                name="pyflink.fn_execution.beam.beam_coder_impl",
+                sources=["pyflink/fn_execution/beam/beam_coder_impl.pyx"],
+                include_dirs=["pyflink/fn_execution/beam"]),
         ])
     except ImportError:
         if os.path.exists("pyflink/fn_execution/fast_coder_impl.c"):
@@ -80,7 +92,19 @@ else:
                 Extension(
                     name="pyflink.fn_execution.fast_operations",
                     sources=["pyflink/fn_execution/fast_operations.c"],
-                    include_dirs=["pyflink/fn_execution/"])
+                    include_dirs=["pyflink/fn_execution/"]),
+                Extension(
+                    name="pyflink.fn_execution.stream",
+                    sources=["pyflink/fn_execution/stream.c"],
+                    include_dirs=["pyflink/fn_execution/"]),
+                Extension(
+                    name="pyflink.fn_execution.beam.beam_stream",
+                    sources=["pyflink/fn_execution/beam/beam_stream.c"],
+                    include_dirs=["pyflink/fn_execution/beam"]),
+                Extension(
+                    name="pyflink.fn_execution.beam.beam_coder_impl",
+                    sources=["pyflink/fn_execution/beam/beam_coder_impl.c"],
+                    include_dirs=["pyflink/fn_execution/beam"]),
             ])
         else:
             extensions = ([])
@@ -238,6 +262,7 @@ run sdist.
                 'pyflink.dataset',
                 'pyflink.common',
                 'pyflink.fn_execution',
+                'pyflink.fn_execution.beam',
                 'pyflink.metrics',
                 'pyflink.ml',
                 'pyflink.ml.api',
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamPythonFunctionRunner.java
index 0aa9ee0..2426fed 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamPythonFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamPythonFunctionRunner.java
@@ -42,7 +42,6 @@ import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
-import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.PortablePipelineOptions;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -154,11 +153,6 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner {
 			PipelineOptionsFactory.as(PortablePipelineOptions.class);
 		// one operator has one Python SDK harness
 		portableOptions.setSdkWorkerParallelism(1);
-		ExperimentalOptions experimentalOptions = portableOptions.as(ExperimentalOptions.class);
-		for (Map.Entry<String, String> entry : jobOptions.entrySet()) {
-			ExperimentalOptions.addExperiment(experimentalOptions,
-				String.join("=", entry.getKey(), entry.getValue()));
-		}
 
 		Struct pipelineOptions = PipelineOptionsTranslation.toProto(portableOptions);
 
@@ -219,11 +213,13 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner {
 		PythonEnvironment environment = environmentManager.createEnvironment();
 		if (environment instanceof ProcessPythonEnvironment) {
 			ProcessPythonEnvironment processEnvironment = (ProcessPythonEnvironment) environment;
+			Map<String, String> env = processEnvironment.getEnv();
+			env.putAll(jobOptions);
 			return Environments.createProcessEnvironment(
 				"",
 				"",
 				processEnvironment.getCommand(),
-				processEnvironment.getEnv());
+				env);
 		}
 		throw new RuntimeException("Currently only ProcessPythonEnvironment is supported.");
 	}