You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2020/07/14 07:13:57 UTC
[flink] branch master updated: [FLINK-18491][python] Extract the
Beam specific coder classes into a separate Python module
This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ca6cef9 [FLINK-18491][python] Extract the Beam specific coder classes into a separate Python module
ca6cef9 is described below
commit ca6cef968efd4153e5de620977784454f0fb0c1f
Author: huangxingbo <hx...@gmail.com>
AuthorDate: Wed Jul 8 19:36:29 2020 +0800
[FLINK-18491][python] Extract the Beam specific coder classes into a separate Python module
This closes #12870.
---
flink-python/bin/pyflink-udf-runner.sh | 2 +-
.../{sdk_worker_main.py => beam/__init__.py} | 24 --
.../fn_execution/{boot.py => beam/beam_boot.py} | 2 +-
.../beam_coder_impl.pxd} | 33 +-
.../pyflink/fn_execution/beam/beam_coder_impl.pyx | 58 ++++
.../pyflink/fn_execution/beam/beam_coders.py | 213 +++++++++++++
.../beam_sdk_worker_main.py} | 11 +-
.../beam_slow_coder_impl.py} | 0
.../{sdk_worker_main.py => beam/beam_stream.pxd} | 37 +--
.../pyflink/fn_execution/beam/beam_stream.pyx | 106 +++++++
flink-python/pyflink/fn_execution/coders.py | 318 +++++--------------
.../pyflink/fn_execution/fast_coder_impl.pxd | 138 +++-----
.../pyflink/fn_execution/fast_coder_impl.pyx | 347 +++++++--------------
.../pyflink/fn_execution/fast_operations.pxd | 5 +-
.../pyflink/fn_execution/fast_operations.pyx | 33 +-
.../{sdk_worker_main.py => stream.pxd} | 29 +-
.../{sdk_worker_main.py => stream.pyx} | 33 +-
.../pyflink/fn_execution/tests/test_coders.py | 7 +-
.../pyflink/fn_execution/tests/test_fast_coders.py | 20 +-
.../fn_execution/tests/test_process_mode_boot.py | 5 +-
flink-python/setup.py | 29 +-
.../python/beam/BeamPythonFunctionRunner.java | 10 +-
22 files changed, 735 insertions(+), 725 deletions(-)
diff --git a/flink-python/bin/pyflink-udf-runner.sh b/flink-python/bin/pyflink-udf-runner.sh
index 04bdbc7..7c0c557 100755
--- a/flink-python/bin/pyflink-udf-runner.sh
+++ b/flink-python/bin/pyflink-udf-runner.sh
@@ -40,4 +40,4 @@ if [[ "$_PYTHON_WORKING_DIR" != "" ]]; then
fi
log="$BOOT_LOG_DIR/flink-python-udf-boot.log"
-${python} -m pyflink.fn_execution.boot $@ 2>&1 | tee ${log}
+${python} -m pyflink.fn_execution.beam.beam_boot $@ 2>&1 | tee ${log}
diff --git a/flink-python/pyflink/fn_execution/sdk_worker_main.py b/flink-python/pyflink/fn_execution/beam/__init__.py
similarity index 56%
copy from flink-python/pyflink/fn_execution/sdk_worker_main.py
copy to flink-python/pyflink/fn_execution/beam/__init__.py
index 5e750af..65b48d4 100644
--- a/flink-python/pyflink/fn_execution/sdk_worker_main.py
+++ b/flink-python/pyflink/fn_execution/beam/__init__.py
@@ -15,27 +15,3 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-import os
-import sys
-
-# force to register the operations to SDK Harness
-from apache_beam.options.pipeline_options import PipelineOptions
-
-try:
- import pyflink.fn_execution.fast_operations
-except ImportError:
- import pyflink.fn_execution.operations
-
-# force to register the coders to SDK Harness
-import pyflink.fn_execution.coders # noqa # pylint: disable=unused-import
-
-import apache_beam.runners.worker.sdk_worker_main
-
-if 'PIPELINE_OPTIONS' in os.environ:
- pipeline_options = apache_beam.runners.worker.sdk_worker_main._parse_pipeline_options(
- os.environ['PIPELINE_OPTIONS'])
-else:
- pipeline_options = PipelineOptions.from_dictionary({})
-
-if __name__ == '__main__':
- apache_beam.runners.worker.sdk_worker_main.main(sys.argv)
diff --git a/flink-python/pyflink/fn_execution/boot.py b/flink-python/pyflink/fn_execution/beam/beam_boot.py
similarity index 98%
rename from flink-python/pyflink/fn_execution/boot.py
rename to flink-python/pyflink/fn_execution/beam/beam_boot.py
index 1286c91..f460ae6 100644
--- a/flink-python/pyflink/fn_execution/boot.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_boot.py
@@ -103,5 +103,5 @@ if __name__ == "__main__":
if "FLINK_BOOT_TESTING" in os.environ and os.environ["FLINK_BOOT_TESTING"] == "1":
exit(0)
- call([python_exec, "-m", "pyflink.fn_execution.sdk_worker_main"],
+ call([python_exec, "-m", "pyflink.fn_execution.beam.beam_sdk_worker_main"],
stdout=sys.stdout, stderr=sys.stderr, env=env)
diff --git a/flink-python/pyflink/fn_execution/sdk_worker_main.py b/flink-python/pyflink/fn_execution/beam/beam_coder_impl.pxd
similarity index 56%
copy from flink-python/pyflink/fn_execution/sdk_worker_main.py
copy to flink-python/pyflink/fn_execution/beam/beam_coder_impl.pxd
index 5e750af..de58441 100644
--- a/flink-python/pyflink/fn_execution/sdk_worker_main.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_coder_impl.pxd
@@ -15,27 +15,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-import os
-import sys
+# cython: language_level = 3
+# cython: infer_types = True
+# cython: profile=True
+# cython: boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True
-# force to register the operations to SDK Harness
-from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.coders.coder_impl cimport StreamCoderImpl
-try:
- import pyflink.fn_execution.fast_operations
-except ImportError:
- import pyflink.fn_execution.operations
+from pyflink.fn_execution.fast_coder_impl cimport BaseCoderImpl
+from pyflink.fn_execution.stream cimport InputStream
-# force to register the coders to SDK Harness
-import pyflink.fn_execution.coders # noqa # pylint: disable=unused-import
+cdef class PassThroughLengthPrefixCoderImpl(StreamCoderImpl):
+ cdef readonly StreamCoderImpl _value_coder
-import apache_beam.runners.worker.sdk_worker_main
+cdef class BeamCoderImpl(StreamCoderImpl):
+ cdef readonly BaseCoderImpl _value_coder
-if 'PIPELINE_OPTIONS' in os.environ:
- pipeline_options = apache_beam.runners.worker.sdk_worker_main._parse_pipeline_options(
- os.environ['PIPELINE_OPTIONS'])
-else:
- pipeline_options = PipelineOptions.from_dictionary({})
-
-if __name__ == '__main__':
- apache_beam.runners.worker.sdk_worker_main.main(sys.argv)
+cdef class InputStreamWrapper:
+ cdef BaseCoderImpl _value_coder
+ cdef InputStream _input_stream
diff --git a/flink-python/pyflink/fn_execution/beam/beam_coder_impl.pyx b/flink-python/pyflink/fn_execution/beam/beam_coder_impl.pyx
new file mode 100644
index 0000000..9874030
--- /dev/null
+++ b/flink-python/pyflink/fn_execution/beam/beam_coder_impl.pyx
@@ -0,0 +1,58 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+# cython: language_level = 3
+# cython: infer_types = True
+# cython: profile=True
+# cython: boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True
+
+from apache_beam.coders.coder_impl cimport InputStream as BInputStream
+from apache_beam.coders.coder_impl cimport OutputStream as BOutputStream
+from apache_beam.coders.coder_impl cimport StreamCoderImpl
+
+from pyflink.fn_execution.beam.beam_stream cimport BeamInputStream
+
+cdef class PassThroughLengthPrefixCoderImpl(StreamCoderImpl):
+ def __cinit__(self, value_coder):
+ self._value_coder = value_coder
+
+ cpdef encode_to_stream(self, value, BOutputStream out_stream, bint nested):
+ self._value_coder.encode_to_stream(value, out_stream, nested)
+
+ cpdef decode_from_stream(self, BInputStream in_stream, bint nested):
+ return self._value_coder.decode_from_stream(in_stream, nested)
+
+ cpdef get_estimated_size_and_observables(self, value, bint nested=False):
+ return 0, []
+
+cdef class BeamCoderImpl(StreamCoderImpl):
+ def __cinit__(self, value_coder):
+ self._value_coder = value_coder
+
+ cpdef encode_to_stream(self, value, BOutputStream out_stream, bint nested):
+ self._value_coder.encode(value, out_stream)
+
+ cpdef decode_from_stream(self, BInputStream in_stream, bint nested):
+ cdef BeamInputStream input_stream = BeamInputStream(in_stream, in_stream.size())
+ cdef InputStreamWrapper input_stream_wrapper = InputStreamWrapper(self._value_coder,
+ input_stream)
+ return input_stream_wrapper
+
+cdef class InputStreamWrapper:
+ def __cinit__(self, value_coder, input_stream):
+ self._value_coder = value_coder
+ self._input_stream = input_stream
diff --git a/flink-python/pyflink/fn_execution/beam/beam_coders.py b/flink-python/pyflink/fn_execution/beam/beam_coders.py
new file mode 100644
index 0000000..6c284c9
--- /dev/null
+++ b/flink-python/pyflink/fn_execution/beam/beam_coders.py
@@ -0,0 +1,213 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+import pyarrow as pa
+import pytz
+from apache_beam.coders import Coder
+from apache_beam.coders.coders import FastCoder, LengthPrefixCoder
+from apache_beam.portability import common_urns
+from apache_beam.typehints import typehints
+
+from pyflink.fn_execution.beam import beam_slow_coder_impl
+
+try:
+ from pyflink.fn_execution import fast_coder_impl as coder_impl
+ from pyflink.fn_execution.beam.beam_coder_impl import BeamCoderImpl, \
+ PassThroughLengthPrefixCoderImpl
+except ImportError:
+ coder_impl = beam_slow_coder_impl
+ BeamCoderImpl = lambda a: a
+ PassThroughLengthPrefixCoderImpl = coder_impl.PassThroughLengthPrefixCoderImpl
+
+from pyflink.fn_execution import flink_fn_execution_pb2, coders
+from pyflink.table.types import TinyIntType, SmallIntType, IntType, BigIntType, BooleanType, \
+ FloatType, DoubleType, VarCharType, VarBinaryType, DecimalType, DateType, TimeType, \
+ LocalZonedTimestampType, RowType, RowField, to_arrow_type, TimestampType, ArrayType
+
+
+class PassThroughLengthPrefixCoder(LengthPrefixCoder):
+ """
+ Coder which doesn't prefix the length of the encoded object as the length prefix will be handled
+ by the wrapped value coder.
+ """
+
+ def __init__(self, value_coder):
+ super(PassThroughLengthPrefixCoder, self).__init__(value_coder)
+
+ def _create_impl(self):
+ return PassThroughLengthPrefixCoderImpl(self._value_coder.get_impl())
+
+ def __repr__(self):
+ return 'PassThroughLengthPrefixCoder[%s]' % self._value_coder
+
+
+Coder.register_structured_urn(
+ common_urns.coders.LENGTH_PREFIX.urn, PassThroughLengthPrefixCoder)
+
+
+class BeamTableFunctionRowCoder(FastCoder):
+ """
+ Coder for Table Function Row.
+ """
+
+ def __init__(self, table_function_row_coder):
+ self._table_function_row_coder = table_function_row_coder
+
+ def _create_impl(self):
+ return self._table_function_row_coder.get_impl()
+
+ def get_impl(self):
+ return BeamCoderImpl(self._create_impl())
+
+ def to_type_hint(self):
+ return typehints.List
+
+ @Coder.register_urn(coders.FLINK_TABLE_FUNCTION_SCHEMA_CODER_URN, flink_fn_execution_pb2.Schema)
+ def _pickle_from_runner_api_parameter(schema_proto, unused_components, unused_context):
+ return BeamTableFunctionRowCoder(
+ coders.TableFunctionRowCoder.from_schema_proto(schema_proto))
+
+ def __repr__(self):
+ return 'TableFunctionRowCoder[%s]' % repr(self._table_function_row_coder)
+
+ def __eq__(self, other):
+ return (self.__class__ == other.__class__
+ and self._table_function_row_coder == other._table_function_row_coder)
+
+ def __ne__(self, other):
+ return not self == other
+
+ def __hash__(self):
+ return hash(self._table_function_row_coder)
+
+
+class BeamFlattenRowCoder(FastCoder):
+ """
+ Coder for Row. The decoded result will be flattened as a list of column values of a row instead
+ of a row object.
+ """
+
+ def __init__(self, flatten_coder):
+ self._flatten_coder = flatten_coder
+
+ def _create_impl(self):
+ return self._flatten_coder.get_impl()
+
+ def get_impl(self):
+ return BeamCoderImpl(self._create_impl())
+
+ def to_type_hint(self):
+ return typehints.List
+
+ @Coder.register_urn(coders.FLINK_SCALAR_FUNCTION_SCHEMA_CODER_URN,
+ flink_fn_execution_pb2.Schema)
+ def _pickle_from_runner_api_parameter(schema_proto, unused_components, unused_context):
+ return BeamFlattenRowCoder(coders.FlattenRowCoder.from_schema_proto(schema_proto))
+
+ def __repr__(self):
+ return 'BeamFlattenRowCoder[%s]' % repr(self._flatten_coder)
+
+ def __eq__(self, other):
+ return (self.__class__ == other.__class__
+ and self._flatten_coder == other._flatten_coder)
+
+ def __ne__(self, other):
+ return not self == other
+
+ def __hash__(self):
+ return hash(self._flatten_coder)
+
+
+class ArrowCoder(FastCoder):
+ """
+ Coder for Arrow.
+ """
+
+ def __init__(self, schema, row_type, timezone):
+ self._schema = schema
+ self._row_type = row_type
+ self._timezone = timezone
+
+ def _create_impl(self):
+ return beam_slow_coder_impl.ArrowCoderImpl(self._schema, self._row_type, self._timezone)
+
+ def to_type_hint(self):
+ import pandas as pd
+ return pd.Series
+
+ @Coder.register_urn(coders.FLINK_SCALAR_FUNCTION_SCHEMA_ARROW_CODER_URN,
+ flink_fn_execution_pb2.Schema)
+ def _pickle_from_runner_api_parameter(schema_proto, unused_components, unused_context):
+
+ def _to_arrow_schema(row_type):
+ return pa.schema([pa.field(n, to_arrow_type(t), t._nullable)
+ for n, t in zip(row_type.field_names(), row_type.field_types())])
+
+ def _to_data_type(field_type):
+ if field_type.type_name == flink_fn_execution_pb2.Schema.TINYINT:
+ return TinyIntType(field_type.nullable)
+ elif field_type.type_name == flink_fn_execution_pb2.Schema.SMALLINT:
+ return SmallIntType(field_type.nullable)
+ elif field_type.type_name == flink_fn_execution_pb2.Schema.INT:
+ return IntType(field_type.nullable)
+ elif field_type.type_name == flink_fn_execution_pb2.Schema.BIGINT:
+ return BigIntType(field_type.nullable)
+ elif field_type.type_name == flink_fn_execution_pb2.Schema.BOOLEAN:
+ return BooleanType(field_type.nullable)
+ elif field_type.type_name == flink_fn_execution_pb2.Schema.FLOAT:
+ return FloatType(field_type.nullable)
+ elif field_type.type_name == flink_fn_execution_pb2.Schema.DOUBLE:
+ return DoubleType(field_type.nullable)
+ elif field_type.type_name == flink_fn_execution_pb2.Schema.VARCHAR:
+ return VarCharType(0x7fffffff, field_type.nullable)
+ elif field_type.type_name == flink_fn_execution_pb2.Schema.VARBINARY:
+ return VarBinaryType(0x7fffffff, field_type.nullable)
+ elif field_type.type_name == flink_fn_execution_pb2.Schema.DECIMAL:
+ return DecimalType(field_type.decimal_info.precision,
+ field_type.decimal_info.scale,
+ field_type.nullable)
+ elif field_type.type_name == flink_fn_execution_pb2.Schema.DATE:
+ return DateType(field_type.nullable)
+ elif field_type.type_name == flink_fn_execution_pb2.Schema.TIME:
+ return TimeType(field_type.time_info.precision, field_type.nullable)
+ elif field_type.type_name == \
+ flink_fn_execution_pb2.Schema.LOCAL_ZONED_TIMESTAMP:
+ return LocalZonedTimestampType(field_type.local_zoned_timestamp_info.precision,
+ field_type.nullable)
+ elif field_type.type_name == flink_fn_execution_pb2.Schema.TIMESTAMP:
+ return TimestampType(field_type.timestamp_info.precision, field_type.nullable)
+ elif field_type.type_name == flink_fn_execution_pb2.Schema.ARRAY:
+ return ArrayType(_to_data_type(field_type.collection_element_type),
+ field_type.nullable)
+ elif field_type.type_name == flink_fn_execution_pb2.Schema.TypeName.ROW:
+ return RowType(
+ [RowField(f.name, _to_data_type(f.type), f.description)
+ for f in field_type.row_schema.fields], field_type.nullable)
+ else:
+ raise ValueError("field_type %s is not supported." % field_type)
+
+ def _to_row_type(row_schema):
+ return RowType([RowField(f.name, _to_data_type(f.type)) for f in row_schema.fields])
+
+ timezone = pytz.timezone(os.environ['table.exec.timezone'])
+ row_type = _to_row_type(schema_proto)
+ return ArrowCoder(_to_arrow_schema(row_type), row_type, timezone)
+
+ def __repr__(self):
+ return 'ArrowCoder[%s]' % self._schema
diff --git a/flink-python/pyflink/fn_execution/sdk_worker_main.py b/flink-python/pyflink/fn_execution/beam/beam_sdk_worker_main.py
similarity index 77%
copy from flink-python/pyflink/fn_execution/sdk_worker_main.py
copy to flink-python/pyflink/fn_execution/beam/beam_sdk_worker_main.py
index 5e750af..48f7a31 100644
--- a/flink-python/pyflink/fn_execution/sdk_worker_main.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_sdk_worker_main.py
@@ -15,27 +15,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-import os
import sys
# force to register the operations to SDK Harness
-from apache_beam.options.pipeline_options import PipelineOptions
-
try:
import pyflink.fn_execution.fast_operations
except ImportError:
import pyflink.fn_execution.operations
# force to register the coders to SDK Harness
-import pyflink.fn_execution.coders # noqa # pylint: disable=unused-import
+import pyflink.fn_execution.beam.beam_coders # noqa # pylint: disable=unused-import
import apache_beam.runners.worker.sdk_worker_main
-if 'PIPELINE_OPTIONS' in os.environ:
- pipeline_options = apache_beam.runners.worker.sdk_worker_main._parse_pipeline_options(
- os.environ['PIPELINE_OPTIONS'])
-else:
- pipeline_options = PipelineOptions.from_dictionary({})
-
if __name__ == '__main__':
apache_beam.runners.worker.sdk_worker_main.main(sys.argv)
diff --git a/flink-python/pyflink/fn_execution/coder_impl.py b/flink-python/pyflink/fn_execution/beam/beam_slow_coder_impl.py
similarity index 100%
rename from flink-python/pyflink/fn_execution/coder_impl.py
rename to flink-python/pyflink/fn_execution/beam/beam_slow_coder_impl.py
diff --git a/flink-python/pyflink/fn_execution/sdk_worker_main.py b/flink-python/pyflink/fn_execution/beam/beam_stream.pxd
similarity index 55%
copy from flink-python/pyflink/fn_execution/sdk_worker_main.py
copy to flink-python/pyflink/fn_execution/beam/beam_stream.pxd
index 5e750af..4876550 100644
--- a/flink-python/pyflink/fn_execution/sdk_worker_main.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_stream.pxd
@@ -15,27 +15,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-import os
-import sys
+# cython: language_level = 3
-# force to register the operations to SDK Harness
-from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.coders.coder_impl cimport InputStream as BInputStream
+from apache_beam.coders.coder_impl cimport OutputStream as BOutputStream
-try:
- import pyflink.fn_execution.fast_operations
-except ImportError:
- import pyflink.fn_execution.operations
+from pyflink.fn_execution.stream cimport InputStream, OutputStream
-# force to register the coders to SDK Harness
-import pyflink.fn_execution.coders # noqa # pylint: disable=unused-import
+cdef class BeamInputStream(InputStream):
+ cdef char*_input_data
+ cdef size_t _input_buffer_size
+ cdef size_t _input_pos
+ cdef void _parse_input_stream(self, BInputStream input_stream)
-import apache_beam.runners.worker.sdk_worker_main
-
-if 'PIPELINE_OPTIONS' in os.environ:
- pipeline_options = apache_beam.runners.worker.sdk_worker_main._parse_pipeline_options(
- os.environ['PIPELINE_OPTIONS'])
-else:
- pipeline_options = PipelineOptions.from_dictionary({})
-
-if __name__ == '__main__':
- apache_beam.runners.worker.sdk_worker_main.main(sys.argv)
+cdef class BeamOutputStream(OutputStream):
+ cdef char*_output_data
+ cdef size_t _output_pos
+ cdef size_t _output_buffer_size
+ cdef BOutputStream _output_stream
+ cdef void _map_output_data_to_output_stream(self)
+ cdef void _maybe_flush(self)
+ cdef void _parse_output_stream(self, BOutputStream output_stream)
diff --git a/flink-python/pyflink/fn_execution/beam/beam_stream.pyx b/flink-python/pyflink/fn_execution/beam/beam_stream.pyx
new file mode 100644
index 0000000..d461470
--- /dev/null
+++ b/flink-python/pyflink/fn_execution/beam/beam_stream.pyx
@@ -0,0 +1,106 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+# cython: language_level = 3
+# cython: infer_types = True
+# cython: profile=True
+# cython: boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True
+
+from libc.stdlib cimport realloc
+from libc.string cimport memcpy
+
+cdef class BeamInputStream(InputStream):
+ def __cinit__(self, input_stream, size):
+ self._input_buffer_size = size
+ self._input_pos = 0
+ self._parse_input_stream(input_stream)
+
+ cdef size_t read(self, char** data):
+ cdef size_t length = 0
+ cdef bint has_prefix = True
+ cdef size_t shift = 0
+ cdef char bits
+ # read the var-int size
+ while has_prefix:
+ bits = self._input_data[self._input_pos] & 0x7F
+ length |= bits << shift
+ shift += 7
+ if not (self._input_data[self._input_pos] & 0x80):
+ has_prefix = False
+ self._input_pos += 1
+ data[0] = self._input_data + self._input_pos
+ self._input_pos += length
+ return length
+
+ cdef size_t available(self):
+ return self._input_buffer_size - self._input_pos
+
+ cdef void _parse_input_stream(self, BInputStream input_stream):
+ self._input_data = input_stream.allc
+ input_stream.pos = self._input_buffer_size
+
+cdef class BeamOutputStream(OutputStream):
+ def __cinit__(self, output_stream):
+ self._output_stream = output_stream
+ self._parse_output_stream(output_stream)
+
+ cdef void write(self, char*data, size_t length):
+ cdef char bits
+ cdef size_t size = length
+ # the length of the variable prefix length will be less than 9 bytes
+ if self._output_buffer_size < self._output_pos + length + 9:
+ self._output_buffer_size += length + 9
+ self._output_data = <char*> realloc(self._output_data,
+ self._output_buffer_size)
+ # write variable prefix length
+ while size:
+ bits = size & 0x7F
+ size >>= 7
+ if size:
+ bits |= 0x80
+ self._output_data[self._output_pos] = bits
+ self._output_pos += 1
+
+ if length < 8:
+ # This is faster than memcpy when the string is short.
+ for i in range(length):
+ self._output_data[self._output_pos + i] = data[i]
+ else:
+ memcpy(self._output_data + self._output_pos, data, length)
+ self._output_pos += length
+ self._maybe_flush()
+
+ cpdef void flush(self):
+ cdef size_t i
+ self._map_output_data_to_output_stream()
+ self._output_stream.maybe_flush()
+
+ cdef void _parse_output_stream(self, BOutputStream output_stream):
+ self._output_data = output_stream.data
+ self._output_pos = output_stream.pos
+ self._output_buffer_size = output_stream.buffer_size
+
+ cdef void _maybe_flush(self):
+ if self._output_pos > 10_000_000:
+ self._map_output_data_to_output_stream()
+ self._output_stream.flush()
+ self._output_pos = 0
+
+ cdef void _map_output_data_to_output_stream(self):
+ self._output_stream.data = self._output_data
+ self._output_stream.pos = self._output_pos
+ self._output_stream.buffer_size = self._output_buffer_size
diff --git a/flink-python/pyflink/fn_execution/coders.py b/flink-python/pyflink/fn_execution/coders.py
index d2c9e79..8969f31 100644
--- a/flink-python/pyflink/fn_execution/coders.py
+++ b/flink-python/pyflink/fn_execution/coders.py
@@ -16,58 +16,51 @@
# limitations under the License.
################################################################################
+import os
from abc import ABC
-
-import datetime
-import decimal
-import pyarrow as pa
import pytz
-from apache_beam.coders import Coder
-from apache_beam.coders.coders import FastCoder, LengthPrefixCoder
-from apache_beam.options.pipeline_options import DebugOptions
-from apache_beam.portability import common_urns
-from apache_beam.typehints import typehints
-from pyflink.fn_execution import coder_impl as slow_coder_impl
+from pyflink.fn_execution import flink_fn_execution_pb2
+
try:
from pyflink.fn_execution import fast_coder_impl as coder_impl
-except ImportError:
- coder_impl = slow_coder_impl
-from pyflink.fn_execution import flink_fn_execution_pb2
-from pyflink.fn_execution.sdk_worker_main import pipeline_options
-from pyflink.table.types import Row, TinyIntType, SmallIntType, IntType, BigIntType, BooleanType, \
- FloatType, DoubleType, VarCharType, VarBinaryType, DecimalType, DateType, TimeType, \
- LocalZonedTimestampType, RowType, RowField, to_arrow_type, TimestampType, ArrayType
+except:
+ from pyflink.fn_execution.beam import beam_slow_coder_impl as coder_impl
+
+__all__ = ['RowCoder', 'BigIntCoder', 'TinyIntCoder', 'BooleanCoder',
+ 'SmallIntCoder', 'IntCoder', 'FloatCoder', 'DoubleCoder',
+ 'BinaryCoder', 'CharCoder', 'DateCoder', 'TimeCoder',
+ 'TimestampCoder', 'ArrayCoder', 'MapCoder', 'DecimalCoder']
FLINK_SCALAR_FUNCTION_SCHEMA_CODER_URN = "flink:coder:schema:scalar_function:v1"
FLINK_TABLE_FUNCTION_SCHEMA_CODER_URN = "flink:coder:schema:table_function:v1"
FLINK_SCALAR_FUNCTION_SCHEMA_ARROW_CODER_URN = "flink:coder:schema:scalar_function:arrow:v1"
-__all__ = ['FlattenRowCoder', 'RowCoder', 'BigIntCoder', 'TinyIntCoder', 'BooleanCoder',
- 'SmallIntCoder', 'IntCoder', 'FloatCoder', 'DoubleCoder',
- 'BinaryCoder', 'CharCoder', 'DateCoder', 'TimeCoder',
- 'TimestampCoder', 'ArrayCoder', 'MapCoder', 'DecimalCoder', 'ArrowCoder']
+class BaseCoder(ABC):
+ def get_impl(self):
+ pass
+
+ @staticmethod
+ def from_schema_proto(schema_proto):
+ pass
-class TableFunctionRowCoder(FastCoder):
+class TableFunctionRowCoder(BaseCoder):
"""
Coder for Table Function Row.
"""
+
def __init__(self, flatten_row_coder):
self._flatten_row_coder = flatten_row_coder
- def _create_impl(self):
+ def get_impl(self):
return coder_impl.TableFunctionRowCoderImpl(self._flatten_row_coder.get_impl())
- def to_type_hint(self):
- return typehints.List
-
- @Coder.register_urn(FLINK_TABLE_FUNCTION_SCHEMA_CODER_URN, flink_fn_execution_pb2.Schema)
- def _pickle_from_runner_api_parameter(schema_proto, unused_components, unused_context):
- return TableFunctionRowCoder(FlattenRowCoder([from_proto(f.type)
- for f in schema_proto.fields]))
+ @staticmethod
+ def from_schema_proto(schema_proto):
+ return TableFunctionRowCoder(FlattenRowCoder.from_schema_proto(schema_proto))
def __repr__(self):
return 'TableFunctionRowCoder[%s]' % repr(self._flatten_row_coder)
@@ -83,7 +76,7 @@ class TableFunctionRowCoder(FastCoder):
return hash(self._flatten_row_coder)
-class FlattenRowCoder(FastCoder):
+class FlattenRowCoder(BaseCoder):
"""
Coder for Row. The decoded result will be flattened as a list of column values of a row instead
of a row object.
@@ -92,17 +85,11 @@ class FlattenRowCoder(FastCoder):
def __init__(self, field_coders):
self._field_coders = field_coders
- def _create_impl(self):
+ def get_impl(self):
return coder_impl.FlattenRowCoderImpl([c.get_impl() for c in self._field_coders])
- def is_deterministic(self):
- return all(c.is_deterministic() for c in self._field_coders)
-
- def to_type_hint(self):
- return typehints.List
-
- @Coder.register_urn(FLINK_SCALAR_FUNCTION_SCHEMA_CODER_URN, flink_fn_execution_pb2.Schema)
- def _pickle_from_runner_api_parameter(schema_proto, unused_components, unused_context):
+ @staticmethod
+ def from_schema_proto(schema_proto):
return FlattenRowCoder([from_proto(f.type) for f in schema_proto.fields])
def __repr__(self):
@@ -121,46 +108,37 @@ class FlattenRowCoder(FastCoder):
return hash(self._field_coders)
-class RowCoder(FlattenRowCoder):
+class FieldCoder(ABC):
+ def get_impl(self):
+ pass
+
+
+class RowCoder(FieldCoder):
"""
Coder for Row.
"""
def __init__(self, field_coders):
- super(RowCoder, self).__init__(field_coders)
-
- def _create_impl(self):
- return coder_impl.RowCoderImpl([c.get_impl() for c in self._field_coders])
+ self._field_coders = field_coders
def get_impl(self):
- return self._create_impl()
-
- def to_type_hint(self):
- return Row
+ return coder_impl.RowCoderImpl([c.get_impl() for c in self._field_coders])
def __repr__(self):
return 'RowCoder[%s]' % ', '.join(str(c) for c in self._field_coders)
-class CollectionCoder(FastCoder):
+class CollectionCoder(FieldCoder):
"""
Base coder for collection.
"""
+
def __init__(self, elem_coder):
self._elem_coder = elem_coder
- def _create_impl(self):
- raise NotImplementedError
-
- def get_impl(self):
- return self._create_impl()
-
def is_deterministic(self):
return self._elem_coder.is_deterministic()
- def to_type_hint(self):
- return []
-
def __eq__(self, other):
return (self.__class__ == other.__class__
and self._elem_coder == other._elem_coder)
@@ -184,11 +162,11 @@ class ArrayCoder(CollectionCoder):
self._elem_coder = elem_coder
super(ArrayCoder, self).__init__(elem_coder)
- def _create_impl(self):
+ def get_impl(self):
return coder_impl.ArrayCoderImpl(self._elem_coder.get_impl())
-class MapCoder(FastCoder):
+class MapCoder(FieldCoder):
"""
Coder for Map.
"""
@@ -197,18 +175,12 @@ class MapCoder(FastCoder):
self._key_coder = key_coder
self._value_coder = value_coder
- def _create_impl(self):
- return coder_impl.MapCoderImpl(self._key_coder.get_impl(), self._value_coder.get_impl())
-
def get_impl(self):
- return self._create_impl()
+ return coder_impl.MapCoderImpl(self._key_coder.get_impl(), self._value_coder.get_impl())
def is_deterministic(self):
return self._key_coder.is_deterministic() and self._value_coder.is_deterministic()
- def to_type_hint(self):
- return {}
-
def __repr__(self):
return 'MapCoder[%s]' % ','.join([repr(self._key_coder), repr(self._value_coder)])
@@ -224,103 +196,70 @@ class MapCoder(FastCoder):
return hash([self._key_coder, self._value_coder])
-class DeterministicCoder(FastCoder, ABC):
- """
- Base Coder for all deterministic Coders.
- """
-
- def is_deterministic(self):
- return True
-
- def get_impl(self):
- return self._create_impl()
-
-
-class BigIntCoder(DeterministicCoder):
+class BigIntCoder(FieldCoder):
"""
Coder for 8 bytes long.
"""
- def _create_impl(self):
+ def get_impl(self):
return coder_impl.BigIntCoderImpl()
- def to_type_hint(self):
- return int
-
-class TinyIntCoder(DeterministicCoder):
+class TinyIntCoder(FieldCoder):
"""
Coder for Byte.
"""
- def _create_impl(self):
+ def get_impl(self):
return coder_impl.TinyIntCoderImpl()
- def to_type_hint(self):
- return int
-
-class BooleanCoder(DeterministicCoder):
+class BooleanCoder(FieldCoder):
"""
Coder for Boolean.
"""
- def _create_impl(self):
+ def get_impl(self):
return coder_impl.BooleanCoderImpl()
- def to_type_hint(self):
- return bool
-
-class SmallIntCoder(DeterministicCoder):
+class SmallIntCoder(FieldCoder):
"""
Coder for Short.
"""
- def _create_impl(self):
+ def get_impl(self):
return coder_impl.SmallIntCoderImpl()
- def to_type_hint(self):
- return int
-
-class IntCoder(DeterministicCoder):
+class IntCoder(FieldCoder):
"""
Coder for 4 bytes int.
"""
- def _create_impl(self):
+ def get_impl(self):
return coder_impl.IntCoderImpl()
- def to_type_hint(self):
- return int
-
-class FloatCoder(DeterministicCoder):
+class FloatCoder(FieldCoder):
"""
Coder for Float.
"""
- def _create_impl(self):
+ def get_impl(self):
return coder_impl.FloatCoderImpl()
- def to_type_hint(self):
- return float
-
-class DoubleCoder(DeterministicCoder):
+class DoubleCoder(FieldCoder):
"""
Coder for Double.
"""
- def _create_impl(self):
+ def get_impl(self):
return coder_impl.DoubleCoderImpl()
- def to_type_hint(self):
- return float
-
-class DecimalCoder(DeterministicCoder):
+class DecimalCoder(FieldCoder):
"""
Coder for Decimal.
"""
@@ -329,61 +268,47 @@ class DecimalCoder(DeterministicCoder):
self.precision = precision
self.scale = scale
- def _create_impl(self):
+ def get_impl(self):
return coder_impl.DecimalCoderImpl(self.precision, self.scale)
- def to_type_hint(self):
- return decimal.Decimal
-
-class BinaryCoder(DeterministicCoder):
+class BinaryCoder(FieldCoder):
"""
Coder for Byte Array.
"""
- def _create_impl(self):
+ def get_impl(self):
return coder_impl.BinaryCoderImpl()
- def to_type_hint(self):
- return bytes
-
-class CharCoder(DeterministicCoder):
+class CharCoder(FieldCoder):
"""
Coder for Character String.
"""
- def _create_impl(self):
- return coder_impl.CharCoderImpl()
- def to_type_hint(self):
- return str
+ def get_impl(self):
+ return coder_impl.CharCoderImpl()
-class DateCoder(DeterministicCoder):
+class DateCoder(FieldCoder):
"""
Coder for Date
"""
- def _create_impl(self):
+ def get_impl(self):
return coder_impl.DateCoderImpl()
- def to_type_hint(self):
- return datetime.date
-
-class TimeCoder(DeterministicCoder):
+class TimeCoder(FieldCoder):
"""
Coder for Time.
"""
- def _create_impl(self):
+ def get_impl(self):
return coder_impl.TimeCoderImpl()
- def to_type_hint(self):
- return datetime.time
-
-class TimestampCoder(DeterministicCoder):
+class TimestampCoder(FieldCoder):
"""
Coder for Timestamp.
"""
@@ -391,14 +316,11 @@ class TimestampCoder(DeterministicCoder):
def __init__(self, precision):
self.precision = precision
- def _create_impl(self):
+ def get_impl(self):
return coder_impl.TimestampCoderImpl(self.precision)
- def to_type_hint(self):
- return datetime.datetime
-
-class LocalZonedTimestampCoder(DeterministicCoder):
+class LocalZonedTimestampCoder(FieldCoder):
"""
Coder for LocalZonedTimestamp.
"""
@@ -407,108 +329,9 @@ class LocalZonedTimestampCoder(DeterministicCoder):
self.precision = precision
self.timezone = timezone
- def _create_impl(self):
+ def get_impl(self):
return coder_impl.LocalZonedTimestampCoderImpl(self.precision, self.timezone)
- def to_type_hint(self):
- return datetime.datetime
-
-
-class ArrowCoder(DeterministicCoder):
- """
- Coder for Arrow.
- """
- def __init__(self, schema, row_type, timezone):
- self._schema = schema
- self._row_type = row_type
- self._timezone = timezone
-
- def _create_impl(self):
- return slow_coder_impl.ArrowCoderImpl(self._schema, self._row_type, self._timezone)
-
- def to_type_hint(self):
- import pandas as pd
- return pd.Series
-
- @Coder.register_urn(FLINK_SCALAR_FUNCTION_SCHEMA_ARROW_CODER_URN,
- flink_fn_execution_pb2.Schema)
- def _pickle_from_runner_api_parameter(schema_proto, unused_components, unused_context):
- def _to_arrow_schema(row_type):
- return pa.schema([pa.field(n, to_arrow_type(t), t._nullable)
- for n, t in zip(row_type.field_names(), row_type.field_types())])
-
- def _to_data_type(field_type):
- if field_type.type_name == flink_fn_execution_pb2.Schema.TINYINT:
- return TinyIntType(field_type.nullable)
- elif field_type.type_name == flink_fn_execution_pb2.Schema.SMALLINT:
- return SmallIntType(field_type.nullable)
- elif field_type.type_name == flink_fn_execution_pb2.Schema.INT:
- return IntType(field_type.nullable)
- elif field_type.type_name == flink_fn_execution_pb2.Schema.BIGINT:
- return BigIntType(field_type.nullable)
- elif field_type.type_name == flink_fn_execution_pb2.Schema.BOOLEAN:
- return BooleanType(field_type.nullable)
- elif field_type.type_name == flink_fn_execution_pb2.Schema.FLOAT:
- return FloatType(field_type.nullable)
- elif field_type.type_name == flink_fn_execution_pb2.Schema.DOUBLE:
- return DoubleType(field_type.nullable)
- elif field_type.type_name == flink_fn_execution_pb2.Schema.VARCHAR:
- return VarCharType(0x7fffffff, field_type.nullable)
- elif field_type.type_name == flink_fn_execution_pb2.Schema.VARBINARY:
- return VarBinaryType(0x7fffffff, field_type.nullable)
- elif field_type.type_name == flink_fn_execution_pb2.Schema.DECIMAL:
- return DecimalType(field_type.decimal_info.precision,
- field_type.decimal_info.scale,
- field_type.nullable)
- elif field_type.type_name == flink_fn_execution_pb2.Schema.DATE:
- return DateType(field_type.nullable)
- elif field_type.type_name == flink_fn_execution_pb2.Schema.TIME:
- return TimeType(field_type.time_info.precision, field_type.nullable)
- elif field_type.type_name == \
- flink_fn_execution_pb2.Schema.LOCAL_ZONED_TIMESTAMP:
- return LocalZonedTimestampType(field_type.local_zoned_timestamp_info.precision,
- field_type.nullable)
- elif field_type.type_name == flink_fn_execution_pb2.Schema.TIMESTAMP:
- return TimestampType(field_type.timestamp_info.precision, field_type.nullable)
- elif field_type.type_name == flink_fn_execution_pb2.Schema.ARRAY:
- return ArrayType(_to_data_type(field_type.collection_element_type),
- field_type.nullable)
- elif field_type.type_name == flink_fn_execution_pb2.Schema.TypeName.ROW:
- return RowType(
- [RowField(f.name, _to_data_type(f.type), f.description)
- for f in field_type.row_schema.fields], field_type.nullable)
- else:
- raise ValueError("field_type %s is not supported." % field_type)
-
- def _to_row_type(row_schema):
- return RowType([RowField(f.name, _to_data_type(f.type)) for f in row_schema.fields])
-
- timezone = pytz.timezone(pipeline_options.view_as(DebugOptions).lookup_experiment(
- "table.exec.timezone"))
- row_type = _to_row_type(schema_proto)
- return ArrowCoder(_to_arrow_schema(row_type), row_type, timezone)
-
- def __repr__(self):
- return 'ArrowCoder[%s]' % self._schema
-
-
-class PassThroughLengthPrefixCoder(LengthPrefixCoder):
- """
- Coder which doesn't prefix the length of the encoded object as the length prefix will be handled
- by the wrapped value coder.
- """
- def __init__(self, value_coder):
- super(PassThroughLengthPrefixCoder, self).__init__(value_coder)
-
- def _create_impl(self):
- return coder_impl.PassThroughLengthPrefixCoderImpl(self._value_coder.get_impl())
-
- def __repr__(self):
- return 'PassThroughLengthPrefixCoder[%s]' % self._value_coder
-
-
-Coder.register_structured_urn(
- common_urns.coders.LENGTH_PREFIX.urn, PassThroughLengthPrefixCoder)
type_name = flink_fn_execution_pb2.Schema
_type_name_mappings = {
@@ -544,8 +367,7 @@ def from_proto(field_type):
if field_type_name == type_name.TIMESTAMP:
return TimestampCoder(field_type.timestamp_info.precision)
if field_type_name == type_name.LOCAL_ZONED_TIMESTAMP:
- timezone = pytz.timezone(pipeline_options.view_as(DebugOptions).lookup_experiment(
- "table.exec.timezone"))
+ timezone = pytz.timezone(os.environ['table.exec.timezone'])
return LocalZonedTimestampCoder(field_type.local_zoned_timestamp_info.precision, timezone)
elif field_type_name == type_name.ARRAY:
return ArrayCoder(from_proto(field_type.collection_element_type))
diff --git a/flink-python/pyflink/fn_execution/fast_coder_impl.pxd b/flink-python/pyflink/fn_execution/fast_coder_impl.pxd
index 2fc7266..3d53e53 100644
--- a/flink-python/pyflink/fn_execution/fast_coder_impl.pxd
+++ b/flink-python/pyflink/fn_execution/fast_coder_impl.pxd
@@ -19,54 +19,23 @@
cimport libc.stdint
-from apache_beam.coders.coder_impl cimport StreamCoderImpl, OutputStream, InputStream
-
-# InputStreamAndFunctionWrapper wraps the user-defined function
-# and input_stream_wrapper in operations
-cdef class InputStreamAndFunctionWrapper:
- # user-defined function
- cdef readonly object func
- cdef InputStreamWrapper input_stream_wrapper
-
-# InputStreamWrapper wraps input_stream and related infos used to decode data
-cdef class InputStreamWrapper:
- cdef InputStream input_stream
- cdef list input_field_coders
- cdef TypeName*input_field_type
- cdef CoderType*input_coder_type
- cdef libc.stdint.int32_t input_field_count
- cdef libc.stdint.int32_t input_leading_complete_bytes_num
- cdef libc.stdint.int32_t input_remaining_bits_num
- cdef size_t input_buffer_size
-
-cdef class PassThroughLengthPrefixCoderImpl(StreamCoderImpl):
- cdef readonly StreamCoderImpl _value_coder
-
-cdef class FlattenRowCoderImpl(StreamCoderImpl):
- # the input field coders and related args used to decode input_stream data
- cdef list _input_field_coders
- cdef TypeName*_input_field_type
- cdef CoderType*_input_coder_type
- cdef libc.stdint.int32_t _input_field_count
- cdef libc.stdint.int32_t _input_leading_complete_bytes_num
- cdef libc.stdint.int32_t _input_remaining_bits_num
-
- # the output field coders and related args used to encode data to output_stream
- cdef readonly list _output_field_coders
- cdef TypeName*_output_field_type
- cdef CoderType*_output_coder_type
- cdef libc.stdint.int32_t _output_field_count
- cdef libc.stdint.int32_t _output_leading_complete_bytes_num
- cdef libc.stdint.int32_t _output_remaining_bits_num
+from pyflink.fn_execution.stream cimport InputStream, OutputStream
+
+cdef class BaseCoderImpl:
+ cpdef void encode(self, value, OutputStream output_stream)
+ cpdef decode(self, InputStream input_stream)
+
+cdef class FlattenRowCoderImpl(BaseCoderImpl):
+ cdef readonly list _field_coders
+ cdef TypeName*_field_type
+ cdef CoderType*_field_coder_type
+ cdef size_t _field_count
+ cdef size_t _leading_complete_bytes_num
+ cdef size_t _remaining_bits_num
cdef bint*_null_mask
cdef unsigned char*_null_byte_search_table
- # the char pointer used to store encoded data of output_stream
- cdef char*_output_data
- cdef size_t _output_buffer_size
- cdef size_t _output_pos
-
# the tmp char pointer used to store encoded data of every row
cdef char*_tmp_output_data
cdef size_t _tmp_output_buffer_size
@@ -75,40 +44,24 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
# the char pointer used to map the decoded data of input_stream
cdef char*_input_data
cdef size_t _input_pos
- cdef size_t _input_buffer_size
# used to store the result of Python user-defined function
cdef list row
- # the Python user-defined function
- cdef object func
-
# initial attribute
cdef void _init_attribute(self)
- # wrap input_stream
- cdef InputStreamWrapper _wrap_input_stream(self, InputStream input_stream, size_t size)
-
- cdef void _write_null_mask(self, value, libc.stdint.int32_t leading_complete_bytes_num,
- libc.stdint.int32_t remaining_bits_num)
- cdef void _read_null_mask(self, bint*null_mask, libc.stdint.int32_t leading_complete_bytes_num,
- libc.stdint.int32_t remaining_bits_num)
-
- cdef void _prepare_encode(self, InputStreamAndFunctionWrapper input_stream_and_function_wrapper,
- OutputStream out_stream)
-
- cdef void _maybe_flush(self, OutputStream out_stream)
- # Because output_buffer will be reallocated during encoding data, we need to remap output_buffer
- # to the data pointer of output_stream
- cdef void _map_output_data_to_output_stream(self, OutputStream out_stream)
- cdef void _copy_to_output_buffer(self)
+ cdef void _write_null_mask(self, value, size_t leading_complete_bytes_num,
+ size_t remaining_bits_num)
+ cdef void _read_null_mask(self, bint*null_mask, size_t leading_complete_bytes_num,
+ size_t remaining_bits_num)
# encode data to output_stream
- cdef void _encode_one_row(self, value)
- cdef void _encode_field(self, CoderType coder_type, TypeName field_type, BaseCoder field_coder,
- item)
+ cdef void _encode_one_row(self, value, OutputStream output_stream)
+ cdef void _encode_field(self, CoderType coder_type, TypeName field_type, FieldCoder field_coder,
+ item)
cdef void _encode_field_simple(self, TypeName field_type, item)
- cdef void _encode_field_complex(self, TypeName field_type, BaseCoder field_coder, item)
+ cdef void _encode_field_complex(self, TypeName field_type, FieldCoder field_coder, item)
cdef void _extend(self, size_t missing)
cdef void _encode_byte(self, unsigned char val)
cdef void _encode_smallint(self, libc.stdint.int16_t v)
@@ -119,10 +72,11 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
cdef void _encode_bytes(self, char*b, size_t length)
# decode data from input_stream
- cdef void _decode_next_row(self)
- cdef object _decode_field(self, CoderType coder_type, TypeName field_type, BaseCoder field_coder)
+ cdef void _decode_next_row(self, InputStream input_stream)
+ cdef object _decode_field(self, CoderType coder_type, TypeName field_type,
+ FieldCoder field_coder)
cdef object _decode_field_simple(self, TypeName field_type)
- cdef object _decode_field_complex(self, TypeName field_type, BaseCoder field_coder)
+ cdef object _decode_field_complex(self, TypeName field_type, FieldCoder field_coder)
cdef unsigned char _decode_byte(self) except? -1
cdef libc.stdint.int16_t _decode_smallint(self) except? -1
cdef libc.stdint.int32_t _decode_int(self) except? -1
@@ -132,7 +86,7 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
cdef bytes _decode_bytes(self)
cdef class TableFunctionRowCoderImpl(FlattenRowCoderImpl):
- cdef void _encode_end_message(self)
+ cdef char* _end_message
cdef enum CoderType:
UNDEFINED = -1
@@ -159,59 +113,59 @@ cdef enum TypeName:
MAP = 15
LOCAL_ZONED_TIMESTAMP = 16
-cdef class BaseCoder:
+cdef class FieldCoder:
cpdef CoderType coder_type(self)
cpdef TypeName type_name(self)
-cdef class TinyIntCoderImpl(BaseCoder):
+cdef class TinyIntCoderImpl(FieldCoder):
pass
-cdef class SmallIntCoderImpl(BaseCoder):
+cdef class SmallIntCoderImpl(FieldCoder):
pass
-cdef class IntCoderImpl(BaseCoder):
+cdef class IntCoderImpl(FieldCoder):
pass
-cdef class BigIntCoderImpl(BaseCoder):
+cdef class BigIntCoderImpl(FieldCoder):
pass
-cdef class BooleanCoderImpl(BaseCoder):
+cdef class BooleanCoderImpl(FieldCoder):
pass
-cdef class FloatCoderImpl(BaseCoder):
+cdef class FloatCoderImpl(FieldCoder):
pass
-cdef class DoubleCoderImpl(BaseCoder):
+cdef class DoubleCoderImpl(FieldCoder):
pass
-cdef class BinaryCoderImpl(BaseCoder):
+cdef class BinaryCoderImpl(FieldCoder):
pass
-cdef class CharCoderImpl(BaseCoder):
+cdef class CharCoderImpl(FieldCoder):
pass
-cdef class DateCoderImpl(BaseCoder):
+cdef class DateCoderImpl(FieldCoder):
pass
-cdef class TimeCoderImpl(BaseCoder):
+cdef class TimeCoderImpl(FieldCoder):
pass
-cdef class DecimalCoderImpl(BaseCoder):
+cdef class DecimalCoderImpl(FieldCoder):
cdef readonly object context
cdef readonly object scale_format
-cdef class TimestampCoderImpl(BaseCoder):
+cdef class TimestampCoderImpl(FieldCoder):
cdef readonly bint is_compact
cdef class LocalZonedTimestampCoderImpl(TimestampCoderImpl):
cdef readonly object timezone
-cdef class ArrayCoderImpl(BaseCoder):
- cdef readonly BaseCoder elem_coder
+cdef class ArrayCoderImpl(FieldCoder):
+ cdef readonly FieldCoder elem_coder
-cdef class MapCoderImpl(BaseCoder):
- cdef readonly BaseCoder key_coder
- cdef readonly BaseCoder value_coder
+cdef class MapCoderImpl(FieldCoder):
+ cdef readonly FieldCoder key_coder
+ cdef readonly FieldCoder value_coder
-cdef class RowCoderImpl(BaseCoder):
+cdef class RowCoderImpl(FieldCoder):
cdef readonly list field_coders
diff --git a/flink-python/pyflink/fn_execution/fast_coder_impl.pyx b/flink-python/pyflink/fn_execution/fast_coder_impl.pyx
index eb189fd..38481bc 100644
--- a/flink-python/pyflink/fn_execution/fast_coder_impl.pyx
+++ b/flink-python/pyflink/fn_execution/fast_coder_impl.pyx
@@ -20,90 +20,62 @@
# cython: profile=True
# cython: boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True
-cimport libc.stdlib
+from libc.stdlib cimport free, malloc, realloc
+from libc.string cimport memcpy
import datetime
import decimal
from pyflink.table import Row
-cdef class InputStreamAndFunctionWrapper:
- def __cinit__(self, func, input_stream_wrapper):
- self.func = func
- self.input_stream_wrapper = input_stream_wrapper
+cdef class BaseCoderImpl:
+ cpdef void encode(self, value, OutputStream output_stream):
+ pass
-cdef class PassThroughLengthPrefixCoderImpl(StreamCoderImpl):
- def __cinit__(self, value_coder):
- self._value_coder = value_coder
-
- cpdef encode_to_stream(self, value, OutputStream out_stream, bint nested):
- self._value_coder.encode_to_stream(value, out_stream, nested)
-
- cpdef decode_from_stream(self, InputStream in_stream, bint nested):
- return self._value_coder.decode_from_stream(in_stream, nested)
-
- cpdef get_estimated_size_and_observables(self, value, bint nested=False):
- return 0, []
+ cpdef decode(self, InputStream input_stream):
+ pass
cdef class TableFunctionRowCoderImpl(FlattenRowCoderImpl):
def __init__(self, flatten_row_coder):
- super(TableFunctionRowCoderImpl, self).__init__(flatten_row_coder._output_field_coders)
-
- cpdef encode_to_stream(self, input_stream_and_function_wrapper, OutputStream out_stream,
- bint nested):
- self._prepare_encode(input_stream_and_function_wrapper, out_stream)
- while self._input_buffer_size > self._input_pos:
- self._decode_next_row()
- result = self.func(self.row)
- if result:
- for value in result:
- if self._output_field_count == 1:
- value = (value,)
- self._encode_one_row(value)
- self._maybe_flush(out_stream)
- self._encode_end_message()
-
- self._map_output_data_to_output_stream(out_stream)
-
- # write 0x00 as end message
- cdef void _encode_end_message(self):
- if self._output_buffer_size < self._output_pos + 2:
- self._extend(2)
- self._output_data[self._output_pos] = 0x01
- self._output_data[self._output_pos + 1] = 0x00
- self._output_pos += 2
+ super(TableFunctionRowCoderImpl, self).__init__(flatten_row_coder._field_coders)
+ self._end_message = <char*> malloc(1)
+ self._end_message[0] = 0x00
+
+ cpdef void encode(self, iter_value, OutputStream output_stream):
+ if iter_value:
+ for value in iter_value:
+ if self._field_count == 1:
+ value = (value,)
+ self._encode_one_row(value, output_stream)
+ # write 0x00 as end message
+ output_stream.write(self._end_message, 1)
-cdef class FlattenRowCoderImpl(StreamCoderImpl):
+ def __dealloc__(self):
+ if self._end_message:
+ free(self._end_message)
+
+cdef class FlattenRowCoderImpl(BaseCoderImpl):
def __init__(self, field_coders):
- self._output_field_coders = field_coders
- self._output_field_count = len(self._output_field_coders)
- self._output_field_type = <TypeName*> libc.stdlib.malloc(
- self._output_field_count * sizeof(TypeName))
- self._output_coder_type = <CoderType*> libc.stdlib.malloc(
- self._output_field_count * sizeof(CoderType))
- self._output_leading_complete_bytes_num = self._output_field_count // 8
- self._output_remaining_bits_num = self._output_field_count % 8
+ self._field_coders = field_coders
+ self._field_count = len(self._field_coders)
+ self._field_type = <TypeName*> malloc(self._field_count * sizeof(TypeName))
+ self._field_coder_type = <CoderType*> malloc(
+ self._field_count * sizeof(CoderType))
+ self._leading_complete_bytes_num = self._field_count // 8
+ self._remaining_bits_num = self._field_count % 8
self._tmp_output_buffer_size = 1024
self._tmp_output_pos = 0
- self._tmp_output_data = <char*> libc.stdlib.malloc(self._tmp_output_buffer_size)
- self._null_byte_search_table = <unsigned char*> libc.stdlib.malloc(
- 8 * sizeof(unsigned char))
+ self._tmp_output_data = <char*> malloc(self._tmp_output_buffer_size)
+ self._null_byte_search_table = <unsigned char*> malloc(8 * sizeof(unsigned char))
+ self._null_mask = <bint*> malloc(self._field_count * sizeof(bint))
self._init_attribute()
+ self.row = [None for _ in range(self._field_count)]
+
+ cpdef void encode(self, value, OutputStream output_stream):
+ self._encode_one_row(value, output_stream)
- cpdef decode_from_stream(self, InputStream in_stream, bint nested):
- cdef InputStreamWrapper input_stream_wrapper
- input_stream_wrapper = self._wrap_input_stream(in_stream, in_stream.size())
- return input_stream_wrapper
-
- cpdef encode_to_stream(self, input_stream_and_function_wrapper, OutputStream out_stream,
- bint nested):
- cdef list result
- self._prepare_encode(input_stream_and_function_wrapper, out_stream)
- while self._input_buffer_size > self._input_pos:
- self._decode_next_row()
- result = self.func(self.row)
- self._encode_one_row(result)
- self._maybe_flush(out_stream)
- self._map_output_data_to_output_stream(out_stream)
+ cpdef decode(self, InputStream input_stream):
+ self._decode_next_row(input_stream)
+ return self.row
cdef void _init_attribute(self):
self._null_byte_search_table[0] = 0x80
@@ -114,46 +86,28 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
self._null_byte_search_table[5] = 0x04
self._null_byte_search_table[6] = 0x02
self._null_byte_search_table[7] = 0x01
- for i in range(self._output_field_count):
- self._output_field_type[i] = self._output_field_coders[i].type_name()
- self._output_coder_type[i] = self._output_field_coders[i].coder_type()
-
- cdef InputStreamWrapper _wrap_input_stream(self, InputStream input_stream, size_t size):
- # wrappers the input field coders and input_stream together
- # so that it can be transposed to operations
- cdef InputStreamWrapper input_stream_wrapper
- input_stream_wrapper = InputStreamWrapper()
- input_stream_wrapper.input_stream = input_stream
- input_stream_wrapper.input_field_coders = self._output_field_coders
- input_stream_wrapper.input_remaining_bits_num = self._output_remaining_bits_num
- input_stream_wrapper.input_leading_complete_bytes_num = \
- self._output_leading_complete_bytes_num
- input_stream_wrapper.input_field_count = self._output_field_count
- input_stream_wrapper.input_field_type = self._output_field_type
- input_stream_wrapper.input_coder_type = self._output_coder_type
- input_stream_wrapper.input_stream.pos = size
- input_stream_wrapper.input_buffer_size = size
- return input_stream_wrapper
-
- cdef void _encode_one_row(self, value):
- cdef libc.stdint.int32_t i
- self._write_null_mask(value, self._output_leading_complete_bytes_num,
- self._output_remaining_bits_num)
- for i in range(self._output_field_count):
+ for i in range(self._field_count):
+ self._field_type[i] = self._field_coders[i].type_name()
+ self._field_coder_type[i] = self._field_coders[i].coder_type()
+
+ cdef void _encode_one_row(self, value, OutputStream output_stream):
+ cdef size_t i
+ self._write_null_mask(value, self._leading_complete_bytes_num, self._remaining_bits_num)
+ for i in range(self._field_count):
item = value[i]
if item is not None:
- if self._output_coder_type[i] == SIMPLE:
- self._encode_field_simple(self._output_field_type[i], item)
+ if self._field_coder_type[i] == SIMPLE:
+ self._encode_field_simple(self._field_type[i], item)
else:
- self._encode_field_complex(self._output_field_type[i],
- self._output_field_coders[i], item)
+ self._encode_field_complex(self._field_type[i], self._field_coders[i], item)
- self._copy_to_output_buffer()
+ output_stream.write(self._tmp_output_data, self._tmp_output_pos)
+ self._tmp_output_pos = 0
cdef void _read_null_mask(self, bint*null_mask,
- libc.stdint.int32_t input_leading_complete_bytes_num,
- libc.stdint.int32_t input_remaining_bits_num):
- cdef libc.stdint.int32_t field_pos, i
+ size_t input_leading_complete_bytes_num,
+ size_t input_remaining_bits_num):
+ cdef size_t field_pos, i
cdef unsigned char b
field_pos = 0
for _ in range(input_leading_complete_bytes_num):
@@ -170,25 +124,25 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
null_mask[field_pos] = (b & self._null_byte_search_table[i]) > 0
field_pos += 1
- cdef void _decode_next_row(self):
- cdef libc.stdint.int32_t i
- # skip prefix variable int length
- while self._input_data[self._input_pos] & 0x80:
- self._input_pos += 1
- self._input_pos += 1
- self._read_null_mask(self._null_mask, self._input_leading_complete_bytes_num,
- self._input_remaining_bits_num)
- for i in range(self._input_field_count):
+ cdef void _decode_next_row(self, InputStream input_stream):
+ cdef size_t i
+ cdef size_t length
+ length = input_stream.read(&self._input_data)
+ self._input_pos = 0
+ self._read_null_mask(self._null_mask, self._leading_complete_bytes_num,
+ self._remaining_bits_num)
+ for i in range(self._field_count):
if self._null_mask[i]:
self.row[i] = None
else:
- if self._input_coder_type[i] == SIMPLE:
- self.row[i] = self._decode_field_simple(self._input_field_type[i])
+ if self._field_coder_type[i] == SIMPLE:
+ self.row[i] = self._decode_field_simple(self._field_type[i])
else:
- self.row[i] = self._decode_field_complex(self._input_field_type[i],
- self._input_field_coders[i])
+ self.row[i] = self._decode_field_complex(self._field_type[i],
+ self._field_coders[i])
- cdef object _decode_field(self, CoderType coder_type, TypeName field_type, BaseCoder field_coder):
+ cdef object _decode_field(self, CoderType coder_type, TypeName field_type,
+ FieldCoder field_coder):
if coder_type == SIMPLE:
return self._decode_field_simple(field_type)
else:
@@ -240,12 +194,12 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
minutes %= 60
return datetime.time(hours, minutes, seconds, milliseconds * 1000)
- cdef object _decode_field_complex(self, TypeName field_type, BaseCoder field_coder):
+ cdef object _decode_field_complex(self, TypeName field_type, FieldCoder field_coder):
cdef libc.stdint.int32_t nanoseconds, microseconds, seconds, length
cdef libc.stdint.int32_t i, row_field_count, leading_complete_bytes_num, remaining_bits_num
cdef libc.stdint.int64_t milliseconds
cdef bint*null_mask
- cdef BaseCoder value_coder, key_coder
+ cdef FieldCoder value_coder, key_coder
cdef TypeName value_type, key_type
cdef CoderType value_coder_type, key_coder_type
cdef list row_field_coders
@@ -287,8 +241,9 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
value_coder = (<ArrayCoderImpl> field_coder).elem_coder
value_type = value_coder.type_name()
value_coder_type = value_coder.coder_type()
- return [self._decode_field(value_coder_type, value_type, value_coder) if self._decode_byte()
- else None for _ in range(length)]
+ return [
+ self._decode_field(value_coder_type, value_type, value_coder) if self._decode_byte()
+ else None for _ in range(length)]
elif field_type == MAP:
# Map
key_coder = (<MapCoderImpl> field_coder).key_coder
@@ -310,7 +265,7 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
# Row
row_field_coders = (<RowCoderImpl> field_coder).field_coders
row_field_count = len(row_field_coders)
- null_mask = <bint*> libc.stdlib.malloc(row_field_count * sizeof(bint))
+ null_mask = <bint*> malloc(row_field_count * sizeof(bint))
leading_complete_bytes_num = row_field_count // 8
remaining_bits_num = row_field_count % 8
self._read_null_mask(null_mask, leading_complete_bytes_num, remaining_bits_num)
@@ -320,7 +275,7 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
row_field_coders[i].type_name(),
row_field_coders[i])
for i in range(row_field_count)])
- libc.stdlib.free(null_mask)
+ free(null_mask)
return row
cdef unsigned char _decode_byte(self) except? -1:
@@ -365,36 +320,8 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
self._input_pos += size
return self._input_data[self._input_pos - size: self._input_pos]
- cdef void _prepare_encode(self, InputStreamAndFunctionWrapper input_stream_and_function_wrapper,
- OutputStream out_stream):
- cdef InputStreamWrapper input_stream_wrapper
- # get the data pointer of output_stream
- self._output_data = out_stream.data
- self._output_pos = out_stream.pos
- self._output_buffer_size = out_stream.buffer_size
- self._tmp_output_pos = 0
-
- input_stream_wrapper = input_stream_and_function_wrapper.input_stream_wrapper
- # get the data pointer of input_stream
- self._input_data = input_stream_wrapper.input_stream.allc
- self._input_buffer_size = input_stream_wrapper.input_buffer_size
-
- # get the infos of input coder which will be used to decode data from input_stream
- self._input_field_count = input_stream_wrapper.input_field_count
- self._input_leading_complete_bytes_num = input_stream_wrapper.input_leading_complete_bytes_num
- self._input_remaining_bits_num = input_stream_wrapper.input_remaining_bits_num
- self._input_field_type = input_stream_wrapper.input_field_type
- self._input_coder_type = input_stream_wrapper.input_coder_type
- self._input_field_coders = input_stream_wrapper.input_field_coders
- self._null_mask = <bint*> libc.stdlib.malloc(self._input_field_count * sizeof(bint))
- self._input_pos = 0
-
- # initial the result row and get the Python user-defined function
- self.row = [None for _ in range(self._input_field_count)]
- self.func = input_stream_and_function_wrapper.func
-
- cdef void _encode_field(self, CoderType coder_type, TypeName field_type, BaseCoder field_coder,
- item):
+ cdef void _encode_field(self, CoderType coder_type, TypeName field_type, FieldCoder field_coder,
+ item):
if coder_type == SIMPLE:
self._encode_field_simple(field_type, item)
else:
@@ -445,14 +372,14 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
milliseconds = hour * 3600000 + minute * 60000 + seconds * 1000 + microsecond // 1000
self._encode_int(milliseconds)
- cdef void _encode_field_complex(self, TypeName field_type, BaseCoder field_coder, item):
+ cdef void _encode_field_complex(self, TypeName field_type, FieldCoder field_coder, item):
cdef libc.stdint.int32_t nanoseconds, microseconds_of_second, length, row_field_count
cdef libc.stdint.int32_t leading_complete_bytes_num, remaining_bits_num
cdef libc.stdint.int64_t timestamp_milliseconds, timestamp_seconds
- cdef BaseCoder value_coder, key_coder
+ cdef FieldCoder value_coder, key_coder
cdef TypeName value_type, key_type
cdef CoderType value_coder_type, key_coder_type
- cdef BaseCoder row_field_coder
+ cdef FieldCoder row_field_coder
cdef list row_field_coders, row_value
if field_type == DECIMAL:
@@ -522,54 +449,12 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
row_field_coder = row_field_coders[i]
if field_item is not None:
self._encode_field(row_field_coder.coder_type(), row_field_coder.type_name(),
- row_field_coder, field_item)
-
- cdef void _copy_to_output_buffer(self):
- cdef size_t size
- cdef size_t i
- cdef bint is_realloc
- cdef char bits
- # the length of the variable prefix length will be less than 9 bytes
- if self._output_buffer_size < self._output_pos + self._tmp_output_pos + 9:
- self._output_buffer_size += self._tmp_output_buffer_size + 9
- self._output_data = <char*> libc.stdlib.realloc(self._output_data,
- self._output_buffer_size)
- size = self._tmp_output_pos
- # write variable prefix length
- while size:
- bits = size & 0x7F
- size >>= 7
- if size:
- bits |= 0x80
- self._output_data[self._output_pos] = bits
- self._output_pos += 1
- if self._tmp_output_pos < 8:
- # This is faster than memcpy when the string is short.
- for i in range(self._tmp_output_pos):
- self._output_data[self._output_pos + i] = self._tmp_output_data[i]
- else:
- libc.string.memcpy(self._output_data + self._output_pos, self._tmp_output_data,
- self._tmp_output_pos)
- self._output_pos += self._tmp_output_pos
- self._tmp_output_pos = 0
-
- cdef void _maybe_flush(self, OutputStream out_stream):
- # Currently, it will trigger flushing when the size of buffer reach to 10_000_000
- if self._output_pos > 10_000_000:
- self._map_output_data_to_output_stream(out_stream)
- out_stream.flush()
- self._output_pos = 0
-
- cdef void _map_output_data_to_output_stream(self, OutputStream out_stream):
- out_stream.data = self._output_data
- out_stream.pos = self._output_pos
- out_stream.buffer_size = self._output_buffer_size
+ row_field_coder, field_item)
cdef void _extend(self, size_t missing):
while self._tmp_output_buffer_size < self._tmp_output_pos + missing:
self._tmp_output_buffer_size *= 2
- self._tmp_output_data = <char*> libc.stdlib.realloc(self._tmp_output_data,
- self._tmp_output_buffer_size)
+ self._tmp_output_data = <char*> realloc(self._tmp_output_data, self._tmp_output_buffer_size)
cdef void _encode_byte(self, unsigned char val):
if self._tmp_output_buffer_size < self._tmp_output_pos + 1:
@@ -621,12 +506,12 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
for i in range(length):
self._tmp_output_data[self._tmp_output_pos + i] = b[i]
else:
- libc.string.memcpy(self._tmp_output_data + self._tmp_output_pos, b, length)
+ memcpy(self._tmp_output_data + self._tmp_output_pos, b, length)
self._tmp_output_pos += length
- cdef void _write_null_mask(self, value, libc.stdint.int32_t leading_complete_bytes_num,
- libc.stdint.int32_t remaining_bits_num):
- cdef libc.stdint.int32_t field_pos, index
+ cdef void _write_null_mask(self, value, size_t leading_complete_bytes_num,
+ size_t remaining_bits_num):
+ cdef size_t field_pos, index
cdef unsigned char*null_byte_search_table
cdef unsigned char b, i
field_pos = 0
@@ -647,95 +532,95 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
self._encode_byte(b)
def __dealloc__(self):
- if self.null_mask:
- libc.stdlib.free(self._null_mask)
- if self.null_byte_search_table:
- libc.stdlib.free(self._null_byte_search_table)
+ if self._null_mask:
+ free(self._null_mask)
+ if self._null_byte_search_table:
+ free(self._null_byte_search_table)
if self._tmp_output_data:
- libc.stdlib.free(self._tmp_output_data)
- if self._output_field_type:
- libc.stdlib.free(self._output_field_type)
- if self._output_coder_type:
- libc.stdlib.free(self._output_coder_type)
+ free(self._tmp_output_data)
+ if self._field_type:
+ free(self._field_type)
+ if self._field_coder_type:
+ free(self._field_coder_type)
-cdef class BaseCoder:
+cdef class FieldCoder:
cpdef CoderType coder_type(self):
return UNDEFINED
cpdef TypeName type_name(self):
return NONE
-cdef class TinyIntCoderImpl(BaseCoder):
+cdef class TinyIntCoderImpl(FieldCoder):
cpdef CoderType coder_type(self):
return SIMPLE
cpdef TypeName type_name(self):
return TINYINT
-cdef class SmallIntCoderImpl(BaseCoder):
+cdef class SmallIntCoderImpl(FieldCoder):
cpdef CoderType coder_type(self):
return SIMPLE
cpdef TypeName type_name(self):
return SMALLINT
-cdef class IntCoderImpl(BaseCoder):
+cdef class IntCoderImpl(FieldCoder):
cpdef CoderType coder_type(self):
return SIMPLE
cpdef TypeName type_name(self):
return INT
-cdef class BigIntCoderImpl(BaseCoder):
+cdef class BigIntCoderImpl(FieldCoder):
cpdef CoderType coder_type(self):
return SIMPLE
cpdef TypeName type_name(self):
return BIGINT
-cdef class BooleanCoderImpl(BaseCoder):
+cdef class BooleanCoderImpl(FieldCoder):
cpdef CoderType coder_type(self):
return SIMPLE
cpdef TypeName type_name(self):
return BOOLEAN
-cdef class FloatCoderImpl(BaseCoder):
+cdef class FloatCoderImpl(FieldCoder):
cpdef CoderType coder_type(self):
return SIMPLE
cpdef TypeName type_name(self):
return FLOAT
-cdef class DoubleCoderImpl(BaseCoder):
+cdef class DoubleCoderImpl(FieldCoder):
cpdef CoderType coder_type(self):
return SIMPLE
cpdef TypeName type_name(self):
return DOUBLE
-cdef class BinaryCoderImpl(BaseCoder):
+cdef class BinaryCoderImpl(FieldCoder):
cpdef CoderType coder_type(self):
return SIMPLE
cpdef TypeName type_name(self):
return BINARY
-cdef class CharCoderImpl(BaseCoder):
+cdef class CharCoderImpl(FieldCoder):
cpdef CoderType coder_type(self):
return SIMPLE
cpdef TypeName type_name(self):
return CHAR
-cdef class DateCoderImpl(BaseCoder):
+cdef class DateCoderImpl(FieldCoder):
cpdef CoderType coder_type(self):
return SIMPLE
cpdef TypeName type_name(self):
return DATE
-cdef class TimeCoderImpl(BaseCoder):
+cdef class TimeCoderImpl(FieldCoder):
cpdef CoderType coder_type(self):
return SIMPLE
cpdef TypeName type_name(self):
return TIME
-cdef class DecimalCoderImpl(BaseCoder):
+cdef class DecimalCoderImpl(FieldCoder):
def __cinit__(self, precision, scale):
self.context = decimal.Context(prec=precision)
self.scale_format = decimal.Decimal(10) ** -scale
@@ -746,7 +631,7 @@ cdef class DecimalCoderImpl(BaseCoder):
cpdef TypeName type_name(self):
return DECIMAL
-cdef class TimestampCoderImpl(BaseCoder):
+cdef class TimestampCoderImpl(FieldCoder):
def __init__(self, precision):
self.is_compact = precision <= 3
@@ -767,7 +652,7 @@ cdef class LocalZonedTimestampCoderImpl(TimestampCoderImpl):
cpdef TypeName type_name(self):
return LOCAL_ZONED_TIMESTAMP
-cdef class ArrayCoderImpl(BaseCoder):
+cdef class ArrayCoderImpl(FieldCoder):
def __cinit__(self, elem_coder):
self.elem_coder = elem_coder
@@ -777,7 +662,7 @@ cdef class ArrayCoderImpl(BaseCoder):
cpdef TypeName type_name(self):
return ARRAY
-cdef class MapCoderImpl(BaseCoder):
+cdef class MapCoderImpl(FieldCoder):
def __cinit__(self, key_coder, value_coder):
self.key_coder = key_coder
self.value_coder = value_coder
@@ -788,7 +673,7 @@ cdef class MapCoderImpl(BaseCoder):
cpdef TypeName type_name(self):
return MAP
-cdef class RowCoderImpl(BaseCoder):
+cdef class RowCoderImpl(FieldCoder):
def __cinit__(self, field_coders):
self.field_coders = field_coders
@@ -796,4 +681,4 @@ cdef class RowCoderImpl(BaseCoder):
return COMPLEX
cpdef TypeName type_name(self):
- return ROW
\ No newline at end of file
+ return ROW
diff --git a/flink-python/pyflink/fn_execution/fast_operations.pxd b/flink-python/pyflink/fn_execution/fast_operations.pxd
index e551827..ba0e7fe 100644
--- a/flink-python/pyflink/fn_execution/fast_operations.pxd
+++ b/flink-python/pyflink/fn_execution/fast_operations.pxd
@@ -19,11 +19,14 @@
cimport libc.stdint
from apache_beam.runners.worker.operations cimport Operation
-from apache_beam.coders.coder_impl cimport StreamCoderImpl, CoderImpl, OutputStream, InputStream
+from apache_beam.coders.coder_impl cimport StreamCoderImpl
+
+from pyflink.fn_execution.fast_coder_impl cimport BaseCoderImpl
cdef class StatelessFunctionOperation(Operation):
cdef Operation consumer
cdef StreamCoderImpl _value_coder_impl
+ cdef BaseCoderImpl _output_coder
cdef dict variable_dict
cdef list user_defined_funcs
cdef libc.stdint.int32_t _func_num
diff --git a/flink-python/pyflink/fn_execution/fast_operations.pyx b/flink-python/pyflink/fn_execution/fast_operations.pyx
index b7a0a34..25bbbaa 100644
--- a/flink-python/pyflink/fn_execution/fast_operations.pyx
+++ b/flink-python/pyflink/fn_execution/fast_operations.pyx
@@ -25,8 +25,9 @@ import cloudpickle
from apache_beam.runners.worker import bundle_processor
from apache_beam.runners.worker import operation_specs
from apache_beam.utils.windowed_value cimport WindowedValue
-from pyflink.fn_execution.fast_coder_impl cimport InputStreamAndFunctionWrapper
+from pyflink.fn_execution.beam.beam_stream cimport BeamInputStream, BeamOutputStream
+from pyflink.fn_execution.beam.beam_coder_impl cimport InputStreamWrapper
from pyflink.fn_execution import flink_fn_execution_pb2
from pyflink.metrics.metricbase import GenericMetricGroup
from pyflink.serializers import PickleSerializer
@@ -45,13 +46,13 @@ cdef class StatelessFunctionOperation(Operation):
def __init__(self, name, spec, counter_factory, sampler, consumers):
super(StatelessFunctionOperation, self).__init__(name, spec, counter_factory, sampler)
self.consumer = consumers['output'][0]
- self._value_coder_impl = self.consumer.windowed_coder.wrapped_value_coder.get_impl()
- value_coder = self._value_coder_impl._value_coder
- from pyflink.fn_execution.coder_impl import ArrowCoderImpl
- if isinstance(value_coder, ArrowCoderImpl):
+ self._value_coder_impl = self.consumer.windowed_coder.wrapped_value_coder.get_impl()._value_coder
+ from pyflink.fn_execution.beam.beam_slow_coder_impl import ArrowCoderImpl
+ if isinstance(self._value_coder_impl, ArrowCoderImpl):
self._is_python_coder = True
else:
self._is_python_coder = False
+ self._output_coder = self._value_coder_impl._value_coder
self.variable_dict = {}
self.user_defined_funcs = []
@@ -80,15 +81,25 @@ cdef class StatelessFunctionOperation(Operation):
user_defined_func.close()
cpdef process(self, WindowedValue o):
- cdef InputStreamAndFunctionWrapper wrapper
+ cdef InputStreamWrapper input_stream_wrapper
+ cdef BeamInputStream input_stream
+ cdef BaseCoderImpl input_coder
+ cdef BeamOutputStream output_stream
with self.scoped_process_state:
- output_stream = self.consumer.output_stream
if self._is_python_coder:
- self._value_coder_impl.encode_to_stream(self.func(o.value), output_stream, True)
+ self._value_coder_impl.encode_to_stream(
+ self.func(o.value), self.consumer.output_stream, True)
+ self.consumer.output_stream.maybe_flush()
else:
- wrapper = InputStreamAndFunctionWrapper(self.func, o.value)
- self._value_coder_impl.encode_to_stream(wrapper, output_stream, True)
- output_stream.maybe_flush()
+ input_stream_wrapper = o.value
+ input_stream = input_stream_wrapper._input_stream
+ input_coder = input_stream_wrapper._value_coder
+ output_stream = BeamOutputStream(self.consumer.output_stream)
+ while input_stream.available():
+ input_data = input_coder.decode(input_stream)
+ result = self.func(input_data)
+ self._output_coder.encode(result, output_stream)
+ output_stream.flush()
def progress_metrics(self):
metrics = super(StatelessFunctionOperation, self).progress_metrics()
diff --git a/flink-python/pyflink/fn_execution/sdk_worker_main.py b/flink-python/pyflink/fn_execution/stream.pxd
similarity index 56%
copy from flink-python/pyflink/fn_execution/sdk_worker_main.py
copy to flink-python/pyflink/fn_execution/stream.pxd
index 5e750af..e2462ff 100644
--- a/flink-python/pyflink/fn_execution/sdk_worker_main.py
+++ b/flink-python/pyflink/fn_execution/stream.pxd
@@ -15,27 +15,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-import os
-import sys
+# cython: language_level = 3
-# force to register the operations to SDK Harness
-from apache_beam.options.pipeline_options import PipelineOptions
+cdef class InputStream:
+ cdef size_t read(self, char**data)
+ cdef size_t available(self)
-try:
- import pyflink.fn_execution.fast_operations
-except ImportError:
- import pyflink.fn_execution.operations
-
-# force to register the coders to SDK Harness
-import pyflink.fn_execution.coders # noqa # pylint: disable=unused-import
-
-import apache_beam.runners.worker.sdk_worker_main
-
-if 'PIPELINE_OPTIONS' in os.environ:
- pipeline_options = apache_beam.runners.worker.sdk_worker_main._parse_pipeline_options(
- os.environ['PIPELINE_OPTIONS'])
-else:
- pipeline_options = PipelineOptions.from_dictionary({})
-
-if __name__ == '__main__':
- apache_beam.runners.worker.sdk_worker_main.main(sys.argv)
+cdef class OutputStream:
+ cdef void write(self, char*data, size_t length)
+ cpdef void flush(self)
diff --git a/flink-python/pyflink/fn_execution/sdk_worker_main.py b/flink-python/pyflink/fn_execution/stream.pyx
similarity index 56%
rename from flink-python/pyflink/fn_execution/sdk_worker_main.py
rename to flink-python/pyflink/fn_execution/stream.pyx
index 5e750af..895c7c6 100644
--- a/flink-python/pyflink/fn_execution/sdk_worker_main.py
+++ b/flink-python/pyflink/fn_execution/stream.pyx
@@ -15,27 +15,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-import os
-import sys
+# cython: language_level = 3
-# force to register the operations to SDK Harness
-from apache_beam.options.pipeline_options import PipelineOptions
+cdef class InputStream:
+ cdef size_t read(self, char** data):
+ pass
+ cdef size_t available(self):
+ pass
-try:
- import pyflink.fn_execution.fast_operations
-except ImportError:
- import pyflink.fn_execution.operations
-
-# force to register the coders to SDK Harness
-import pyflink.fn_execution.coders # noqa # pylint: disable=unused-import
-
-import apache_beam.runners.worker.sdk_worker_main
-
-if 'PIPELINE_OPTIONS' in os.environ:
- pipeline_options = apache_beam.runners.worker.sdk_worker_main._parse_pipeline_options(
- os.environ['PIPELINE_OPTIONS'])
-else:
- pipeline_options = PipelineOptions.from_dictionary({})
-
-if __name__ == '__main__':
- apache_beam.runners.worker.sdk_worker_main.main(sys.argv)
+cdef class OutputStream:
+ cdef void write(self, char*data, size_t length):
+ pass
+ cpdef void flush(self):
+ pass
diff --git a/flink-python/pyflink/fn_execution/tests/test_coders.py b/flink-python/pyflink/fn_execution/tests/test_coders.py
index 3134e83..eb6fba8 100644
--- a/flink-python/pyflink/fn_execution/tests/test_coders.py
+++ b/flink-python/pyflink/fn_execution/tests/test_coders.py
@@ -38,12 +38,13 @@ except ImportError:
class CodersTest(unittest.TestCase):
def check_coder(self, coder, *values):
+ coder_impl = coder.get_impl()
for v in values:
if isinstance(v, float):
from pyflink.table.tests.test_udf import float_equal
- assert float_equal(v, coder.decode(coder.encode(v)), 1e-6)
+ assert float_equal(v, coder_impl.decode(coder_impl.encode(v)), 1e-6)
else:
- self.assertEqual(v, coder.decode(coder.encode(v)))
+ self.assertEqual(v, coder_impl.decode(coder_impl.encode(v)))
# decide whether two floats are equal
@staticmethod
@@ -137,7 +138,7 @@ class CodersTest(unittest.TestCase):
def test_flatten_row_coder(self):
field_coder = BigIntCoder()
field_count = 10
- coder = FlattenRowCoder([field_coder for _ in range(field_count)])
+ coder = FlattenRowCoder([field_coder for _ in range(field_count)]).get_impl()
v = [[None if i % 2 == 0 else i for i in range(field_count)]]
generator_result = coder.decode(coder.encode(v))
result = []
diff --git a/flink-python/pyflink/fn_execution/tests/test_fast_coders.py b/flink-python/pyflink/fn_execution/tests/test_fast_coders.py
index 38ea3be..a3491ea 100644
--- a/flink-python/pyflink/fn_execution/tests/test_fast_coders.py
+++ b/flink-python/pyflink/fn_execution/tests/test_fast_coders.py
@@ -20,10 +20,9 @@
import logging
import unittest
-from pyflink.fn_execution import coder_impl
-
try:
from pyflink.fn_execution import fast_coder_impl
+ from pyflink.fn_execution.beam import beam_slow_coder_impl as coder_impl
have_cython = True
except ImportError:
@@ -35,18 +34,19 @@ class CodersTest(unittest.TestCase):
def check_cython_coder(self, python_field_coders, cython_field_coders, data):
from apache_beam.coders.coder_impl import create_InputStream, create_OutputStream
- from pyflink.fn_execution.fast_coder_impl import InputStreamAndFunctionWrapper
+ from pyflink.fn_execution.beam.beam_stream import BeamInputStream, BeamOutputStream
py_flatten_row_coder = coder_impl.FlattenRowCoderImpl(python_field_coders)
internal = py_flatten_row_coder.encode(data)
- input_stream = create_InputStream(internal)
- output_stream = create_OutputStream()
+ beam_input_stream = create_InputStream(internal)
+ input_stream = BeamInputStream(beam_input_stream, beam_input_stream.size())
+ beam_output_stream = create_OutputStream()
cy_flatten_row_coder = fast_coder_impl.FlattenRowCoderImpl(cython_field_coders)
- value = cy_flatten_row_coder.decode_from_stream(input_stream, False)
- wrapper_func_input_element = InputStreamAndFunctionWrapper(
- lambda v: [v[i] for i in range(len(v))], value)
- cy_flatten_row_coder.encode_to_stream(wrapper_func_input_element, output_stream, False)
+ value = cy_flatten_row_coder.decode(input_stream)
+ output_stream = BeamOutputStream(beam_output_stream)
+ cy_flatten_row_coder.encode(value, output_stream)
+ output_stream.flush()
generator_result = py_flatten_row_coder.decode_from_stream(create_InputStream(
- output_stream.get()), False)
+ beam_output_stream.get()), False)
result = []
for item in generator_result:
result.append(item)
diff --git a/flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py b/flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py
index 9b8e785..af80ed4 100644
--- a/flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py
+++ b/flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py
@@ -126,7 +126,10 @@ class PythonBootTests(PyFlinkTestCase):
fn_execution_dir = os.path.join(pyflink_dir, "fn_execution")
os.mkdir(fn_execution_dir)
open(os.path.join(fn_execution_dir, "__init__.py"), 'a').close()
- with open(os.path.join(fn_execution_dir, "boot.py"), "w") as f:
+ beam_dir = os.path.join(fn_execution_dir, "beam")
+ os.mkdir(beam_dir)
+ open(os.path.join(beam_dir, "__init__.py"), 'a').close()
+ with open(os.path.join(beam_dir, "beam_boot.py"), "w") as f:
f.write("import os\nwith open(r'%s', 'w') as f:\n f.write(os.getcwd())" %
output_file)
diff --git a/flink-python/setup.py b/flink-python/setup.py
index 22ca08a..fc78723 100644
--- a/flink-python/setup.py
+++ b/flink-python/setup.py
@@ -68,7 +68,19 @@ else:
Extension(
name="pyflink.fn_execution.fast_operations",
sources=["pyflink/fn_execution/fast_operations.pyx"],
- include_dirs=["pyflink/fn_execution/"])
+ include_dirs=["pyflink/fn_execution/"]),
+ Extension(
+ name="pyflink.fn_execution.stream",
+ sources=["pyflink/fn_execution/stream.pyx"],
+ include_dirs=["pyflink/fn_execution/"]),
+ Extension(
+ name="pyflink.fn_execution.beam.beam_stream",
+ sources=["pyflink/fn_execution/beam/beam_stream.pyx"],
+ include_dirs=["pyflink/fn_execution/beam"]),
+ Extension(
+ name="pyflink.fn_execution.beam.beam_coder_impl",
+ sources=["pyflink/fn_execution/beam/beam_coder_impl.pyx"],
+ include_dirs=["pyflink/fn_execution/beam"]),
])
except ImportError:
if os.path.exists("pyflink/fn_execution/fast_coder_impl.c"):
@@ -80,7 +92,19 @@ else:
Extension(
name="pyflink.fn_execution.fast_operations",
sources=["pyflink/fn_execution/fast_operations.c"],
- include_dirs=["pyflink/fn_execution/"])
+ include_dirs=["pyflink/fn_execution/"]),
+ Extension(
+ name="pyflink.fn_execution.stream",
+ sources=["pyflink/fn_execution/stream.c"],
+ include_dirs=["pyflink/fn_execution/"]),
+ Extension(
+ name="pyflink.fn_execution.beam.beam_stream",
+ sources=["pyflink/fn_execution/beam/beam_stream.c"],
+ include_dirs=["pyflink/fn_execution/beam"]),
+ Extension(
+ name="pyflink.fn_execution.beam.beam_coder_impl",
+ sources=["pyflink/fn_execution/beam/beam_coder_impl.c"],
+ include_dirs=["pyflink/fn_execution/beam"]),
])
else:
extensions = ([])
@@ -238,6 +262,7 @@ run sdist.
'pyflink.dataset',
'pyflink.common',
'pyflink.fn_execution',
+ 'pyflink.fn_execution.beam',
'pyflink.metrics',
'pyflink.ml',
'pyflink.ml.api',
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamPythonFunctionRunner.java
index 0aa9ee0..2426fed 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamPythonFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamPythonFunctionRunner.java
@@ -42,7 +42,6 @@ import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
-import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
@@ -154,11 +153,6 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner {
PipelineOptionsFactory.as(PortablePipelineOptions.class);
// one operator has one Python SDK harness
portableOptions.setSdkWorkerParallelism(1);
- ExperimentalOptions experimentalOptions = portableOptions.as(ExperimentalOptions.class);
- for (Map.Entry<String, String> entry : jobOptions.entrySet()) {
- ExperimentalOptions.addExperiment(experimentalOptions,
- String.join("=", entry.getKey(), entry.getValue()));
- }
Struct pipelineOptions = PipelineOptionsTranslation.toProto(portableOptions);
@@ -219,11 +213,13 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner {
PythonEnvironment environment = environmentManager.createEnvironment();
if (environment instanceof ProcessPythonEnvironment) {
ProcessPythonEnvironment processEnvironment = (ProcessPythonEnvironment) environment;
+ Map<String, String> env = processEnvironment.getEnv();
+ env.putAll(jobOptions);
return Environments.createProcessEnvironment(
"",
"",
processEnvironment.getCommand(),
- processEnvironment.getEnv());
+ env);
}
throw new RuntimeException("Currently only ProcessPythonEnvironment is supported.");
}