You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/04/16 11:06:25 UTC

[GitHub] [flink] dianfu commented on a change in pull request #11767: [FLINK-17119][python] Add Cython support for composite types

dianfu commented on a change in pull request #11767: [FLINK-17119][python] Add Cython support for composite types
URL: https://github.com/apache/flink/pull/11767#discussion_r409466671
 
 

 ##########
 File path: flink-python/pyflink/fn_execution/fast_coder_impl.pyx
 ##########
 @@ -340,6 +444,86 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
             milliseconds = hour * 3600000 + minute * 60000 + seconds * 1000 + microsecond // 1000
             self._encode_int(milliseconds)
 
+    cdef void _encode_field_complex(self, TypeName field_type, BaseCoder field_coder, item):
+        cdef libc.stdint.int32_t nanoseconds, microseconds_of_second, length, row_field_count
+        cdef libc.stdint.int32_t leading_complete_bytes_num, remaining_bits_num
+        cdef libc.stdint.int64_t timestamp_milliseconds, timestamp_seconds
+        cdef BaseCoder value_coder, key_coder
+        cdef TypeName value_type, key_type
+        cdef CoderType value_coder_type, key_coder_type
+        cdef BaseCoder row_field_coder
+        cdef list row_field_coders, row_value
+
+        if field_type == DECIMAL:
+            # decimal
+            user_context = decimal.getcontext()
+            decimal.setcontext((<DecimalCoderImpl> field_coder).context)
+            bytes_value = str(item.quantize((<DecimalCoderImpl> field_coder).scale_format)).encode(
+                "utf-8")
+            self._encode_bytes(bytes_value)
+            decimal.setcontext(user_context)
+        elif field_type == TIMESTAMP or field_type == LOCAL_ZONED_TIMESTAMP:
+            # Timestamp
+            timestamp_seconds = <libc.stdint.int64_t> (
+                item.replace(tzinfo=datetime.timezone.utc).timestamp())
+            microseconds_of_second = item.microsecond
+            timestamp_milliseconds = timestamp_seconds * 1000 + microseconds_of_second // 1000
+            nanoseconds = microseconds_of_second % 1000 * 1000
+            if field_coder.is_compact:
+                self._encode_bigint(timestamp_milliseconds)
+            else:
+                self._encode_bigint(timestamp_milliseconds)
+                self._encode_int(nanoseconds)
+        elif field_type == ARRAY:
+            # Array
+            length = len(item)
+            value_coder = (<ArrayCoderImpl> field_coder).elem_coder
+            value_type = value_coder.type_name()
+            value_coder_type = value_coder.coder_type()
+            self._encode_int(length)
+            for i in range(length):
+                value = item[i]
+                if value is None:
+                    self._encode_byte(False)
+                else:
+                    self._encode_byte(True)
+                    self._encode_field(value_coder_type, value_type, value_coder, value)
+
 
 Review comment:
   remove the empty line

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services