You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/05/16 04:47:36 UTC

[GitHub] [flink] sunjincheng121 commented on a change in pull request #8420: [FLINK-12408][python] Allow to define the data types in Python

sunjincheng121 commented on a change in pull request #8420: [FLINK-12408][python] Allow to define the data types in Python
URL: https://github.com/apache/flink/pull/8420#discussion_r284533178
 
 

 ##########
 File path: flink-python/pyflink/table/types.py
 ##########
 @@ -15,172 +15,1803 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-
+import base64
+import calendar
+import ctypes
+import datetime
+import decimal
+import json
 import sys
+import time
+from array import array
+from copy import copy
+from functools import reduce
+from enum import Enum
+
+from pyflink.java_gateway import get_gateway
+from pyflink.serializers import PickleSerializer
 
-if sys.version > '3':
-    xrange = range
+if sys.version >= '3':
+    long = int
+    basestring = unicode = str
 
-__all__ = ['DataTypes']
+__all__ = ['DataTypes', 'UserDefinedType', 'Row']
 
 
 class DataType(object):
     """
     Base class for data types.
+
+    :param nullable: boolean, whether the type can be null (None) or not.
     """
-    @classmethod
-    def type_name(cls):
-        return cls.__name__[:-4].lower()
+
+    def __init__(self, nullable=True):
+        self.nullable = nullable
+        self.conversion_cls = ''
+
+    def __repr__(self):
+        return '%s(%s)' % (self.__class__.__name__, str(self.nullable).lower())
 
     def __hash__(self):
-        return hash(self.type_name())
+        return hash(str(self))
 
     def __eq__(self, other):
-        return self.type_name() == other.type_name()
+        return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
 
     def __ne__(self, other):
-        return self.type_name() != other.type_name()
+        return not self.__eq__(other)
 
+    def not_null(self):
+        cp = copy(self)
+        cp.nullable = False
+        return cp
 
-class DataTypeSingleton(type):
-    """
-    Metaclass for DataType
-    """
+    def nullable(self):
+        cp = copy(self)
+        cp.nullable = True
+        return cp
+
+    @classmethod
+    def type_name(cls):
+        return cls.__name__[:-4].upper()
+
+    def json_value(self):
+        return {"type": self.type_name(),
+                "nullable": self.nullable,
+                "conversion_cls": self.conversion_cls}
 
-    _instances = {}
+    def json(self):
+        return json.dumps(self.json_value(),
+                          separators=(',', ':'),
+                          sort_keys=True)
 
-    def __call__(cls):
-        if cls not in cls._instances:
-            cls._instances[cls] = super(DataTypeSingleton, cls).__call__()
-        return cls._instances[cls]
+    def bridged_to(self, conversion_cls):
+        """
+        Adds a hint that data should be represented using the given class when entering or leaving
+        the table ecosystem.
+
+        :param conversion_cls: the string representation of the conversion class
+        """
+        self.conversion_cls = conversion_cls
+
+    def need_conversion(self):
+        """
+        Does this type need to conversion between Python object and internal SQL object.
+
+        This is used to avoid the unnecessary conversion for ArrayType/MapType/RowType.
+        """
+        return False
+
+    def to_sql_type(self, obj):
+        """
+        Converts a Python object into an internal SQL object.
+        """
+        return obj
+
+    def from_sql_type(self, obj):
+        """
+        Converts an internal SQL object into a native Python object.
+        """
+        return obj
 
 
 class AtomicType(DataType):
     """
     An internal type used to represent everything that is not
-    null, arrays, structs, and maps.
+    arrays, rows, and maps.
+    """
+
+    def __init__(self, nullable=True):
+        super(AtomicType, self).__init__(nullable)
+
+
+class NullType(AtomicType):
     """
+    Null type.
+
+    The data type representing None.
+    """
+
+    def __init__(self):
+        super(NullType, self).__init__(True)
 
 
 class NumericType(AtomicType):
     """
     Numeric data types.
     """
 
+    def __init__(self, nullable=True):
+        super(NumericType, self).__init__(nullable)
+
 
 class IntegralType(NumericType):
     """
     Integral data types.
     """
 
-    __metaclass__ = DataTypeSingleton
+    def __init__(self, nullable=True):
+        super(IntegralType, self).__init__(nullable)
 
 
 class FractionalType(NumericType):
     """
     Fractional data types.
     """
 
+    def __init__(self, nullable=True):
+        super(FractionalType, self).__init__(nullable)
+
 
-class StringType(AtomicType):
+class CharType(AtomicType):
     """
-    String data type.  SQL VARCHAR
+    Char data type. SQL CHAR(n)
+
+    The serialized string representation is 'char(n)' where 'n' (default: 1) is the number of
+    bytes. 'n' must have a value between 1 and 255 (both inclusive).
+
+    :param length: int, the string representation length.
+    :param nullable: boolean, whether the type can be null (None) or not.
     """
 
-    __metaclass__ = DataTypeSingleton
+    def __init__(self, length=1, nullable=True):
+        super(CharType, self).__init__(nullable)
+        self.length = length
 
+    def __repr__(self):
+        return 'CharType(%d, %s)' % (self.length, str(self.nullable).lower())
 
-class BooleanType(AtomicType):
+    def json_value(self):
+        j = super(CharType, self).json_value()
+        j.update({'length': self.length})
+        return j
+
+
+class VarCharType(AtomicType):
     """
-    Boolean data types. SQL BOOLEAN
+    Varchar data type. SQL VARCHAR(n)
+
+    The serialized string representation is 'varchar(n)' where 'n' (default: 1) is the number of
+    characters. 'n' must have a value between 1 and 0x7fffffff (both inclusive).
+
+    :param length: int, the maximum string representation length.
+    :param nullable: boolean, whether the type can be null (None) or not.
     """
 
-    __metaclass__ = DataTypeSingleton
+    def __init__(self, length=1, nullable=True):
+        super(VarCharType, self).__init__(nullable)
+        self.length = length
+
+    def __repr__(self):
+        return "VarCharType(%d, %s)" % (self.length, str(self.nullable).lower())
+
+    def json_value(self):
+        j = super(VarCharType, self).json_value()
+        j.update({"length": self.length})
+        return j
 
 
-class ByteType(IntegralType):
+class BinaryType(AtomicType):
     """
-    Byte data type. SQL TINYINT
+    Binary (byte array) data type. SQL BINARY(n)
+
+    The serialized string representation is 'binary(n)' where 'n' (default: 1) is the number of
+    bytes. 'n' must have a value between 1 and 0x7fffffff (both inclusive).
+
+    :param length: int, the number of bytes.
+    :param nullable: boolean, whether the type can be null (None) or not.
     """
 
+    def __init__(self, length=1, nullable=True):
+        super(BinaryType, self).__init__(nullable)
+        self.length = length
+
+    def __repr__(self):
+        return "BinaryType(%d, %s)" % (self.length, str(self.nullable).lower())
 
-class CharType(IntegralType):
+    def json_value(self):
+        j = super(BinaryType, self).json_value()
+        j.update({"length": self.length})
+        return j
+
+
+class VarBinaryType(AtomicType):
     """
-    Char data type. SQL CHAR
+    Binary (byte array) data type. SQL VARBINARY(n)
+
+    The serialized string representation is 'varbinary(n)' where 'n' (default: 1) is the
+    maximum number of bytes. 'n' must have a value between 1 and 0x7fffffff (both inclusive).
+
+    :param length: int, the maximum number of bytes.
+    :param nullable: boolean, whether the type can be null (None) or not.
     """
 
+    def __init__(self, length=1, nullable=True):
+        super(VarBinaryType, self).__init__(nullable)
+        self.length = length
+
+    def __repr__(self):
+        return "VarBinaryType(%d, %s)" % (self.length, str(self.nullable).lower())
+
+    def json_value(self):
+        j = super(VarBinaryType, self).json_value()
+        j.update({"length": self.length})
+        return j
 
-class ShortType(IntegralType):
+
+class BooleanType(AtomicType):
     """
-    Short data types.  SQL SMALLINT (16bits)
+    Boolean data types. SQL BOOLEAN
+
+    :param nullable: boolean, whether the field can be null (None) or not.
+    """
+
+    def __init__(self, nullable=True):
+        super(BooleanType, self).__init__(nullable)
+
+
+class TinyIntType(IntegralType):
+    """
+    Byte data type. SQL TINYINT (8bits)
+
+    :param nullable: boolean, whether the field can be null (None) or not.
+    """
+
+    def __init__(self, nullable=True):
+        super(TinyIntType, self).__init__(nullable)
+
+
+class SmallIntType(IntegralType):
+    """
+    Short data type. SQL SMALLINT (16bits)
+
+    :param nullable: boolean, whether the field can be null (None) or not.
     """
 
+    def __init__(self, nullable=True):
+        super(SmallIntType, self).__init__(nullable)
+
 
-class IntegerType(IntegralType):
+class IntType(IntegralType):
     """
     Int data types. SQL INT (32bits)
+
+    :param nullable: boolean, whether the field can be null (None) or not.
     """
 
+    def __init__(self, nullable=True):
+        super(IntType, self).__init__(nullable)
 
-class LongType(IntegralType):
+
+class BigIntType(IntegralType):
     """
     Long data types. SQL BIGINT (64bits)
+
+    :param nullable: boolean, whether the field can be null (None) or not.
     """
 
+    def __init__(self, nullable=True):
+        super(BigIntType, self).__init__(nullable)
+
 
 class FloatType(FractionalType):
     """
     Float data type. SQL FLOAT
+
+    :param nullable: boolean, whether the field can be null (None) or not.
     """
 
-    __metaclass__ = DataTypeSingleton
+    def __init__(self, nullable=True):
+        super(FloatType, self).__init__(nullable)
 
 
 class DoubleType(FractionalType):
     """
     Double data type. SQL DOUBLE
+
+    :param nullable: boolean, whether the field can be null (None) or not.
+    """
+
+    def __init__(self, nullable=True):
+        super(DoubleType, self).__init__(nullable)
+
+
+class DecimalType(FractionalType):
     """
+    Decimal (decimal.Decimal) data type.
 
-    __metaclass__ = DataTypeSingleton
+    The DecimalType must have fixed precision (the maximum total number of digits)
+    and scale (the number of digits on the right of dot). For example, (5, 2) can
+    support the value from [-999.99 to 999.99].
+
+    The precision can be up to 38, the scale must be less or equal to precision.
+
+    When create a DecimalType, the default precision and scale is (10, 0). When infer
+    schema from decimal.Decimal objects, it will be DecimalType(38, 18).
+
+    :param precision: the maximum total number of digits (default: 10)
+    :param scale: the number of digits on right side of dot. (default: 0)
+    :param nullable: boolean, whether the field can be null (None) or not.
+    """
+
+    def __init__(self, precision=10, scale=0, nullable=True):
+        super(DecimalType, self).__init__(nullable)
+        assert 1 <= precision <= 38
+        assert 0 <= scale <= precision
+        self.precision = precision
+        self.scale = scale
+        self.has_precision_info = True  # this is public API
+
+    def __repr__(self):
+        return "DecimalType(%d, %d, %s)" % (self.precision, self.scale, str(self.nullable).lower())
+
+    def json_value(self):
+        j = super(DecimalType, self).json_value()
+        j.update({"precision": self.precision, "scale": self.scale})
+        return j
 
 
 class DateType(AtomicType):
     """
     Date data type.  SQL DATE
+
+    :param nullable: boolean, whether the field can be null (None) or not.
     """
 
-    __metaclass__ = DataTypeSingleton
+    def __init__(self, nullable=True):
+        super(DateType, self).__init__(nullable)
+
+    EPOCH_ORDINAL = datetime.datetime(1970, 1, 1).toordinal()
+
+    def need_conversion(self):
+        return True
+
+    def to_sql_type(self, d):
+        if d is not None:
+            return d.toordinal() - self.EPOCH_ORDINAL
+
+    def from_sql_type(self, v):
+        if v is not None:
+            return datetime.date.fromordinal(v + self.EPOCH_ORDINAL)
 
 
 class TimeType(AtomicType):
     """
     Time data type. SQL TIME
+
+    The precision must be greater than or equal to 0 and less than or equal to 9.
+
+    :param precision: int, the number of digits of fractional seconds (default: 0)
+    :param nullable: boolean, whether the field can be null (None) or not.
     """
 
-    __metaclass__ = DataTypeSingleton
+    def __init__(self, precision=0, nullable=True):
+        super(TimeType, self).__init__(nullable)
+        assert 0 <= precision <= 9
+        self.precision = precision
+
+    def __repr__(self):
+        return "TimeType(%s, %s)" % (self.precision, str(self.nullable).lower())
+
+    def json_value(self):
+        j = super(TimeType, self).json_value()
+        j.update({"precision": self.precision})
+        return j
+
+    def need_conversion(self):
+        return True
+
+    def to_sql_type(self, t):
+        if t.tzinfo is not None:
+            offset = t.utcoffset()
+            offset = offset if offset else datetime.timedelta()
+        minutes = t.hour * 60 + t.minute
+        seconds = minutes * 60 + t.second
+        offset_microseconds = (offset.days * 86400 + offset.seconds) * 10**6 + offset.microseconds
+        return seconds * 10**6 + t.microsecond - offset_microseconds
+
+    def from_sql_type(self, t):
+        if t is not None:
+            seconds, microseconds = divmod(t, 10**6)
+            minutes, seconds = divmod(seconds, 60)
+            hours, minutes = divmod(minutes, 60)
+            return datetime.time(hours, minutes, seconds, microseconds)
+
+
+class TimestampKind(Enum):
+    """
+    Timestamp kind for the time attribute metadata to timestamps.
+    """
+    REGULAR = 0
+    ROWTIME = 1
+    PROCTIME = 2
 
 
 class TimestampType(AtomicType):
     """
     Timestamp data type.  SQL TIMESTAMP
+
+    The precision must be greater than or equal to 0 and less than or equal to 9.
+
+    :param kind, the time attribute metadata (default: TimestampKind.REGULAR)
+    :param precision: int, the number of digits of fractional seconds (default: 6)
+    :param nullable: boolean, whether the field can be null (None) or not.
     """
 
-    __metaclass__ = DataTypeSingleton
+    def __init__(self, kind=TimestampKind.REGULAR, precision=6, nullable=True):
+        super(TimestampType, self).__init__(nullable)
+        assert isinstance(kind, TimestampKind)
+        assert 0 <= precision <= 9
+        self.kind = kind
+        self.precision = precision
 
+    def __repr__(self):
+        return "TimestampType(%s, %s, %s)" % (
+            self.kind.name, self.precision, str(self.nullable).lower())
 
-class DataTypes(object):
+    def json_value(self):
+        j = super(TimestampType, self).json_value()
+        j.update({"kind": self.kind.name, "precision": self.precision})
+        return j
+
+    def need_conversion(self):
+        return True
+
+    def to_sql_type(self, dt):
+        if dt is not None:
+            seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
+                       else time.mktime(dt.timetuple()))
+            return int(seconds) * 10**6 + dt.microsecond
+
+    def from_sql_type(self, ts):
+        if ts is not None:
+            # using int to avoid precision loss in float
+            return datetime.datetime.fromtimestamp(ts // 10**6).replace(microsecond=ts % 10**6)
+
+
+class ArrayType(DataType):
+    """
+    Array data type.
+
+    :param element_type: :class:`DataType` of each element in the array.
+    :param nullable: boolean, whether the field can be null (None) or not.
+    """
+
+    def __init__(self, element_type, nullable=True):
+        """
+        >>> ArrayType(VarCharType()) == ArrayType(VarCharType())
+        True
+        >>> ArrayType(VarCharType()) == ArrayType(BigIntType())
+        False
+        """
+        assert isinstance(element_type, DataType), \
+            "element_type %s should be an instance of %s" % (element_type, DataType)
+        super(ArrayType, self).__init__(nullable)
+        self.element_type = element_type
+
+    def __repr__(self):
+        return "ArrayType(%s, %s)" % (self.element_type, str(self.nullable).lower())
+
+    def json_value(self):
+        j = super(ArrayType, self).json_value()
+        j.update({"element_type": self.element_type.json_value()})
+        return j
+
+    @classmethod
+    def from_json(cls, json):
+        return ArrayType(_parse_datatype_json_value(json["element_type"]), json["nullable"])
+
+    def need_conversion(self):
+        return self.element_type.need_conversion()
+
+    def to_sql_type(self, obj):
+        if not self.need_conversion():
+            return obj
+        return obj and [self.element_type.to_sql_type(v) for v in obj]
+
+    def from_sql_type(self, obj):
+        if not self.need_conversion():
+            return obj
+        return obj and [self.element_type.to_sql_type(v) for v in obj]
+
+
+class MapType(DataType):
+    """
+    Map data type.
+
+    :param key_type: :class:`DataType` of the keys in the map.
+    :param value_type: :class:`DataType` of the values in the map.
+    :param nullable: boolean, whether the field can be null (None) or not.
+
+    Keys in a map data type are not allowed to be null (None).
+    """
+
+    def __init__(self, key_type, value_type, nullable=True):
+        """
+        >>> (MapType(VarCharType(nullable=False), IntType())
+        ...        == MapType(VarCharType(nullable=False), IntType()))
+        True
+        >>> (MapType(VarCharType(nullable=False), IntType())
+        ...        == MapType(VarCharType(nullable=False), FloatType()))
+        False
+        """
+        assert isinstance(key_type, DataType), \
+            "key_type %s should be an instance of %s" % (key_type, DataType)
+        assert isinstance(value_type, DataType), \
+            "value_type %s should be an instance of %s" % (value_type, DataType)
+        super(MapType, self).__init__(nullable)
+        self.key_type = key_type
+        self.value_type = value_type
+
+    def __repr__(self):
+        return "MapType(%s, %s, %s)" % (self.key_type, self.value_type, str(self.nullable).lower())
+
+    def json_value(self):
+        j = super(MapType, self).json_value()
+        j.update({"key_type": self.key_type.json_value(),
+                  "value_type": self.value_type.json_value()})
+        return j
+
+    @classmethod
+    def from_json(cls, json):
+        return MapType(_parse_datatype_json_value(json["key_type"]),
+                       _parse_datatype_json_value(json["value_type"]),
+                       json["nullable"])
+
+    def need_conversion(self):
+        return self.key_type.need_conversion() or self.value_type.need_conversion()
+
+    def to_sql_type(self, obj):
+        if not self.need_conversion():
+            return obj
+        return obj and dict((self.key_type.to_sql_type(k), self.value_type.to_sql_type(v))
+                            for k, v in obj.items())
+
+    def from_sql_type(self, obj):
+        if not self.need_conversion():
+            return obj
+        return obj and dict((self.key_type.from_sql_type(k), self.value_type.from_sql_type(v))
+                            for k, v in obj.items())
+
+
+class RowField(object):
+    """
+    A field in :class:`RowType`.
+
+    :param name: string, name of the field.
+    :param data_type: :class:`DataType` of the field.
+    :param description: string, description of the field.
+    """
+
+    def __init__(self, name, data_type, description=None):
+        """
+        >>> (RowField("f1", VarCharType())
+        ...      == RowField("f1", VarCharType()))
+        True
+        >>> (RowField("f1", VarCharType())
+        ...      == RowField("f2", VarCharType()))
+        False
+        """
+        assert isinstance(data_type, DataType), \
+            "data_type %s should be an instance of %s" % (data_type, DataType)
+        assert isinstance(name, basestring), "field name %s should be string" % name
+        if not isinstance(name, str):
+            name = name.encode('utf-8')
+        if description is not None:
+            assert isinstance(description, basestring), \
+                "description %s should be string" % description
+            if not isinstance(description, str):
+                description = description.encode('utf-8')
+        self.name = name
+        self.data_type = data_type
+        self.description = '...' if description is None else description
+
+    def __repr__(self):
+        return "RowField(%s, %s, %s)" % (self.name, self.data_type, self.description)
+
+    def __eq__(self, other):
+        return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+    def json_value(self):
+        return {"name": self.name,
+                "type": self.data_type.json_value(),
+                "description": self.description}
+
+    @classmethod
+    def from_json(cls, json):
+        return RowField(json["name"],
+                        _parse_datatype_json_value(json["type"]),
+                        json["description"])
+
+    def need_conversion(self):
+        return self.data_type.need_conversion()
+
+    def to_sql_type(self, obj):
+        return self.data_type.to_sql_type(obj)
+
+    def from_sql_type(self, obj):
+        return self.data_type.from_sql_type(obj)
+
+
+class RowType(DataType):
+    """
+    Row type, consisting of a list of :class:`RowField`.
+
+    This is the data type representing a :class:`Row`.
+
+    Iterating a :class:`RowType` will iterate its :class:`RowField`\\s.
+    A contained :class:`RowField` can be accessed by name or position.
+
+    >>> row1 = RowType([RowField("f1", VarCharType())])
+    >>> row1["f1"]
+    RowField(f1, VarCharType(1))
+    >>> row1[0]
+    RowField(f1, VarCharType(1))
+    """
+
+    def __init__(self, fields=None, nullable=True):
+        """
+        >>> row1 = RowType([RowField("f1", VarCharType())])
+        >>> row2 = RowType([RowField("f1", VarCharType())])
+        >>> row1 == row2
+        True
+        >>> row1 = RowType([RowField("f1", VarCharType())])
+        >>> row2 = RowType([RowField("f1", VarCharType()),
+        ...     RowField("f2", IntType())])
+        >>> row1 == row2
+        False
+        """
+        super(RowType, self).__init__(nullable)
+        if not fields:
+            self.fields = []
+            self.names = []
+        else:
+            self.fields = fields
+            self.names = [f.name for f in fields]
+            assert all(isinstance(f, RowField) for f in fields), \
+                "fields should be a list of RowField"
+        # Precalculated list of fields that need conversion with
+        # from_sql_type/to_sql_type functions
+        self._need_conversion = [f.need_conversion() for f in self]
+        self._need_serialize_any_field = any(self._need_conversion)
+
+    def add(self, field, data_type=None):
+        """
+        Constructs a RowType by adding new elements to it to define the schema. The method accepts
+        either:
+
+            a) A single parameter which is a RowField object.
+            b) 2 parameters as (name, data_type). The data_type parameter may be either a String
+               or a DataType object.
+
+        >>> row1 = RowType().add("f1", VarCharType()).add("f2", VarCharType())
+        >>> row2 = RowType([RowField("f1", VarCharType()), RowField("f2", VarCharType())])
+        >>> row1 == row2
+        True
+        >>> row1 = RowType().add(RowField("f1", VarCharType()))
+        >>> row2 = RowType([RowField("f1", VarCharType())])
+        >>> row1 == row2
+        True
+        >>> row1 = RowType().add("f1", "string")
+        >>> row2 = RowType([RowField("f1", VarCharType())])
+        >>> row1 == row2
+        True
+
+        :param field: Either the name of the field or a RowField object
+        :param data_type: If present, the DataType of the RowField to create
+        :return: a new updated RowType
+        """
+        if isinstance(field, RowField):
+            self.fields.append(field)
+            self.names.append(field.name)
+        else:
+            if isinstance(field, str) and data_type is None:
+                raise ValueError("Must specify DataType if passing name of row_field to create.")
+
+            if isinstance(data_type, str):
+                data_type_f = _parse_datatype_json_value(data_type)
+            else:
+                data_type_f = data_type
+            self.fields.append(RowField(field, data_type_f))
+            self.names.append(field)
+        # Precalculated list of fields that need conversion with
+        # from_sql_type/to_sql_type functions
+        self._need_conversion = [f.need_conversion() for f in self]
+        self._need_serialize_any_field = any(self._need_conversion)
+        return self
+
+    def __iter__(self):
+        """
+        Iterate the fields.
+        """
+        return iter(self.fields)
+
+    def __len__(self):
+        """
+        Returns the number of fields.
+        """
+        return len(self.fields)
+
+    def __getitem__(self, key):
+        """
+        Accesses fields by name or slice.
+        """
+        if isinstance(key, str):
+            for field in self:
+                if field.name == key:
+                    return field
+            raise KeyError('No RowField named {0}'.format(key))
+        elif isinstance(key, int):
+            try:
+                return self.fields[key]
+            except IndexError:
+                raise IndexError('RowType index out of range')
+        elif isinstance(key, slice):
+            return RowType(self.fields[key])
+        else:
+            raise TypeError('RowType keys should be strings, integers or slices')
+
+    def __repr__(self):
+        return "RowType(%s)" % ",".join(str(field) for field in self)
+
+    def json_value(self):
+        j = super(RowType, self).json_value()
+        j.update({"fields": [f.json_value() for f in self]})
+        return j
+
+    @classmethod
+    def from_json(cls, json):
+        return RowType([RowField.from_json(f) for f in json["fields"]], json["nullable"])
+
+    def field_names(self):
+        """
+        Returns all field names in a list.
+
+        >>> row = RowType([RowField("f1", VarCharType())])
+        >>> row.field_names()
+        ['f1']
+        """
+        return list(self.names)
+
+    def need_conversion(self):
+        # We need convert Row()/namedtuple into tuple()
+        return True
+
+    def to_sql_type(self, obj):
+        if obj is None:
+            return
+
+        if self._need_serialize_any_field:
+            # Only calling to_sql_type function for fields that need conversion
+            if isinstance(obj, dict):
+                return tuple(f.to_sql_type(obj.get(n)) if c else obj.get(n)
+                             for n, f, c in zip(self.names, self.fields, self._need_conversion))
+            elif isinstance(obj, (tuple, list)):
+                return tuple(f.to_sql_type(v) if c else v
+                             for f, v, c in zip(self.fields, obj, self._need_conversion))
+            elif hasattr(obj, "__dict__"):
+                d = obj.__dict__
+                return tuple(f.to_sql_type(d.get(n)) if c else d.get(n)
+                             for n, f, c in zip(self.names, self.fields, self._need_conversion))
+            else:
+                raise ValueError("Unexpected tuple %r with RowType" % obj)
+        else:
+            if isinstance(obj, dict):
+                return tuple(obj.get(n) for n in self.names)
+            elif isinstance(obj, Row) and getattr(obj, "__from_dict__", False):
+                return tuple(obj[n] for n in self.names)
+            elif isinstance(obj, (list, tuple)):
+                return tuple(obj)
+            elif hasattr(obj, "__dict__"):
+                d = obj.__dict__
+                return tuple(d.get(n) for n in self.names)
+            else:
+                raise ValueError("Unexpected tuple %r with RowType" % obj)
+
+    def from_sql_type(self, obj):
+        if obj is None:
+            return
+        if isinstance(obj, Row):
+            # it's already converted by pickler
+            return obj
+        if self._need_serialize_any_field:
+            # Only calling from_sql_type function for fields that need conversion
+            values = [f.from_sql_type(v) if c else v
+                      for f, v, c in zip(self.fields, obj, self._need_conversion)]
+        else:
+            values = obj
+        return _create_row(self.names, values)
+
+
+class UserDefinedType(DataType):
+    """
+    User-defined type (UDT).
+
+    .. note:: WARN: Flink Internal Use Only
+    """
+
+    def __eq__(self, other):
+        return type(self) == type(other)
+
+    @classmethod
+    def type_name(cls):
+        return cls.__name__.lower()
+
+    @classmethod
+    def sql_type(cls):
+        """
+        Underlying SQL storage type for this UDT.
+        """
+        raise NotImplementedError("UDT must implement sql_type().")
+
+    @classmethod
+    def module(cls):
+        """
+        The Python module of the UDT.
+        """
+        raise NotImplementedError("UDT must implement module().")
+
+    @classmethod
+    def java_udt(cls):
+        """
+        The class name of the paired Java UDT (could be '', if there
+        is no corresponding one).
+        """
+        return ''
+
+    def need_conversion(self):
+        return True
+
+    @classmethod
+    def _cached_sql_type(cls):
+        """
+        Caches the sql_type() into class, because it's heavy used in `to_sql_type`.
+        """
+        if not hasattr(cls, "__cached_sql_type__"):
+            cls.__cached_sql_type__ = cls.sql_type()
+        return cls.__cached_sql_type__
+
+    def to_sql_type(self, obj):
+        if obj is not None:
+            return self._cached_sql_type().to_sql_type(self.serialize(obj))
+
+    def from_sql_type(self, obj):
+        v = self._cached_sql_type().from_sql_type(obj)
+        if v is not None:
+            return self.deserialize(v)
+
+    def serialize(self, obj):
+        """
+        Converts the a user-type object into a SQL datum.
+        """
+        raise NotImplementedError("UDT must implement serialize().")
+
+    def deserialize(self, datum):
+        """
+        Converts a SQL datum into a user-type object.
+        """
+        raise NotImplementedError("UDT must implement deserialize().")
+
+    def json(self):
+        return json.dumps(self.json_value(),
+                          separators=(',', ':'),
+                          sort_keys=True)
+
+    def json_value(self):
+        if self.java_udt():
+            assert self.module() != '__main__', 'UDT in __main__ cannot work with JavaUDT'
+            schema = {
+                "type": "udt",
+                "class": self.java_udt(),
+                "py_class": "%s.%s" % (self.module(), type(self).__name__),
+                "sql_type": self.sql_type().json_value()
+            }
+        else:
+            b = PickleSerializer().dumps(type(self))
+            schema = {
+                "type": "udt",
+                "py_class": "%s.%s" % (self.module(), type(self).__name__),
+                "serialized_class": base64.b64encode(b).decode('utf8'),
+                "sql_type": self.sql_type().json_value()
+            }
+        return schema
+
+    @classmethod
+    def from_json(cls, json):
+        py_udf = str(json["py_class"])  # convert unicode to str
+        split = py_udf.rfind(".")
+        py_module = py_udf[:split]
+        py_class = py_udf[split + 1:]
+        m = __import__(py_module, globals(), locals(), [py_class])
+        if not hasattr(m, py_class):
+            s = base64.b64decode(json['serialized_class'].encode('utf-8'))
+            UDT = PickleSerializer().loads(s)
+        else:
+            UDT = getattr(m, py_class)
+        return UDT()
+
+
+_static_length_types = [BooleanType, FloatType, DoubleType, TinyIntType,
+                        SmallIntType, IntType, BigIntType, DateType]
+_static_length_type_mappings = dict((t.type_name(), t) for t in _static_length_types)
+
+_var_length_types = [BinaryType, VarBinaryType, CharType, VarCharType]
+_var_length_type_mappings = dict((t.type_name(), t) for t in _var_length_types)
+
+_complex_type_mappings = dict((t.type_name(), t) for t in [ArrayType, MapType, RowType])
+
+
+def _parse_datatype_json_value(json_value):
+    tpe = json_value["type"]
+    if tpe == 'NULL':
+        return NullType()
+    elif tpe == 'DECIMAL':
+        precision = json_value["precision"]
+        scale = json_value["scale"]
+        nullable = json_value["nullable"]
+        return DecimalType(precision, scale, nullable)
+    elif tpe == 'TIME':
+        precision = json_value["precision"]
+        nullable = json_value["nullable"]
+        return TimeType(precision, nullable)
+    elif tpe == 'TIMESTAMP':
+        kind = json_value["kind"]
+        precision = json_value["precision"]
+        nullable = json_value["nullable"]
+        return TimestampType(TimestampKind[kind], precision, nullable)
+    elif tpe == 'DECIMAL':
+        precision = json_value["precision"]
+        scale = json_value["scale"]
+        nullable = json_value["nullable"]
+        return DecimalType(precision, scale, nullable)
+    elif tpe in _static_length_type_mappings:
+        nullable = json_value["nullable"]
+        return _static_length_type_mappings[tpe](nullable)
+    elif tpe in _var_length_type_mappings:
+        length = json_value["length"]
+        nullable = json_value["nullable"]
+        return _var_length_type_mappings[tpe](length, nullable)
+    elif tpe in _complex_type_mappings:
+        return _complex_type_mappings[tpe].from_json(json_value)
+    elif tpe == 'udt':
+        return UserDefinedType.from_json(json_value)
+    else:
+        raise ValueError("Could not parse type: %s" % json_value)
+
+
+# Mapping Python types to Flink SQL types
+_type_mappings = {
+    bool: BooleanType(),
+    int: BigIntType(),
+    float: DoubleType(),
+    str: VarCharType(0x7fffffff),
+    bytearray: VarBinaryType(0x7fffffff),
+    decimal.Decimal: DecimalType(38, 18),
+    datetime.date: DateType(),
+    datetime.datetime: TimestampType(),
+    datetime.time: TimeType(),
+}
+
+if sys.version < "3":
+    _type_mappings.update({
+        unicode: VarCharType(0x7fffffff),
+        long: BigIntType(),
+    })
+
+# Mapping Python array types to Flink SQL types
+# We should be careful here. The size of these types in python depends on C
+# implementation. We need to make sure that this conversion does not lose any
+# precision. Also, JVM only support signed types, when converting unsigned types,
+# keep in mind that it requires 1 more bit when stored as singed types.
+#
+# Reference for C integer size, see:
+# ISO/IEC 9899:201x specification, chapter 5.2.4.2.1 Sizes of integer types <limits.h>.
+# Reference for python array typecode, see:
+# https://docs.python.org/2/library/array.html
+# https://docs.python.org/3.6/library/array.html
+# Reference for JVM's supported integral types:
+# http://docs.oracle.com/javase/specs/jvms/se8/html/jvms-2.html#jvms-2.3.1
+
+_array_signed_int_typecode_ctype_mappings = {
+    'b': ctypes.c_byte,
+    'h': ctypes.c_short,
+    'i': ctypes.c_int,
+    'l': ctypes.c_long,
+}
+
+_array_unsigned_int_typecode_ctype_mappings = {
+    'B': ctypes.c_ubyte,
+    'H': ctypes.c_ushort,
+    'I': ctypes.c_uint,
+    'L': ctypes.c_ulong
+}
+
+
+def _int_size_to_type(size):
+    """
+    Returns the data type from the size of integers.
+    """
+    if size <= 8:
+        return TinyIntType()
+    if size <= 16:
+        return SmallIntType()
+    if size <= 32:
+        return IntType()
+    if size <= 64:
+        return BigIntType()
+
+
+# The list of all supported array typecodes is stored here
+_array_type_mappings = {
+    # Warning: Actual properties for float and double in C is not specified in C.
+    # On almost every system supported by both python and JVM, they are IEEE 754
+    # single-precision binary floating-point format and IEEE 754 double-precision
+    # binary floating-point format. And we do assume the same thing here for now.
+    'f': FloatType(),
+    'd': DoubleType()
+}
+
+# compute array typecode mappings for signed integer types
+for _typecode in _array_signed_int_typecode_ctype_mappings.keys():
+    size = ctypes.sizeof(_array_signed_int_typecode_ctype_mappings[_typecode]) * 8
+    dt = _int_size_to_type(size)
+    if dt is not None:
+        _array_type_mappings[_typecode] = dt
+
+# compute array typecode mappings for unsigned integer types
+for _typecode in _array_unsigned_int_typecode_ctype_mappings.keys():
+    # JVM does not have unsigned types, so use signed types that is at least 1
+    # bit larger to store
+    size = ctypes.sizeof(_array_unsigned_int_typecode_ctype_mappings[_typecode]) * 8 + 1
+    dt = _int_size_to_type(size)
+    if dt is not None:
+        _array_type_mappings[_typecode] = dt
+
+# Type code 'u' in Python's array is deprecated since version 3.3, and will be
+# removed in version 4.0. See: https://docs.python.org/3/library/array.html
+if sys.version_info[0] < 4:
+    # it can be 16 bits or 32 bits depending on the platform
+    _array_type_mappings['u'] = CharType(ctypes.sizeof(ctypes.c_wchar))
+
+# Type code 'c' are only available at python 2
+if sys.version_info[0] < 3:
+    _array_type_mappings['c'] = CharType(ctypes.sizeof(ctypes.c_char))
+
+
+def _infer_type(obj):
+    """
+    Infers the data type from obj.
+    """
+    if obj is None:
+        return NullType()
+
+    if hasattr(obj, '__UDT__'):
+        return obj.__UDT__
+
+    data_type = _type_mappings.get(type(obj))
+    if data_type is not None:
+        return data_type
+
+    if isinstance(obj, dict):
+        for key, value in obj.items():
+            if key is not None and value is not None:
+                return MapType(_infer_type(key).not_null(), _infer_type(value))
+        else:
+            return MapType(NullType(), NullType())
+    elif isinstance(obj, list):
+        for v in obj:
+            if v is not None:
+                return ArrayType(_infer_type(obj[0]))
+        else:
+            return ArrayType(NullType())
+    elif isinstance(obj, array):
+        if obj.typecode in _array_type_mappings:
+            return ArrayType(_array_type_mappings[obj.typecode].not_null())
+        else:
+            raise TypeError("not supported type: array(%s)" % obj.typecode)
+    else:
+        try:
+            return _infer_schema(obj)
+        except TypeError:
+            raise TypeError("not supported type: %s" % type(obj))
+
+
+def _infer_schema(row, names=None):
+    """
+    Infers the schema from dict/row/namedtuple/object.
+    """
+    if isinstance(row, dict):  # dict
+        items = sorted(row.items())
+
+    elif isinstance(row, (tuple, list)):
+        if hasattr(row, "__fields__"):  # Row
+            items = zip(row.__fields__, tuple(row))
+        elif hasattr(row, "_fields"):  # namedtuple
+            items = zip(row._fields, tuple(row))
+        else:
+            if names is None:
+                names = ['_%d' % i for i in range(1, len(row) + 1)]
+            elif len(names) < len(row):
+                names.extend('_%d' % i for i in range(len(names) + 1, len(row) + 1))
+            items = zip(names, row)
+
+    elif hasattr(row, "__dict__"):  # object
+        items = sorted(row.__dict__.items())
+
+    else:
+        raise TypeError("Can not infer schema for type: %s" % type(row))
+
+    fields = [RowField(k, _infer_type(v)) for k, v in items]
+    return RowType(fields)
+
+
+def _has_nulltype(dt):
+    """
+    Returns whether there is NullType in `dt` or not.
+    """
+    if isinstance(dt, RowType):
+        return any(_has_nulltype(f.data_type) for f in dt.fields)
+    elif isinstance(dt, ArrayType):
+        return _has_nulltype(dt.element_type)
+    elif isinstance(dt, MapType):
+        return _has_nulltype(dt.key_type) or _has_nulltype(dt.value_type)
+    else:
+        return isinstance(dt, NullType)
+
+
+def _merge_type(a, b, name=None):
+    if name is None:
+        def new_msg(msg):
+            return msg
+
+        def new_name(n):
+            return "field %s" % n
+    else:
+        def new_msg(msg):
+            return "%s: %s" % (name, msg)
+
+        def new_name(n):
+            return "field %s in %s" % (n, name)
+
+    if isinstance(a, NullType):
+        return b
+    elif isinstance(b, NullType):
+        return a
+    elif type(a) is not type(b):
+        # TODO: type cast (such as int -> long)
+        raise TypeError(new_msg("Can not merge type %s and %s" % (type(a), type(b))))
+
+    # same type
+    if isinstance(a, RowType):
+        nfs = dict((f.name, f.data_type) for f in b.fields)
+        fields = [RowField(f.name, _merge_type(f.data_type, nfs.get(f.name, None),
+                                               name=new_name(f.name)))
+                  for f in a.fields]
+        names = set([f.name for f in fields])
+        for n in nfs:
+            if n not in names:
+                fields.append(RowField(n, nfs[n]))
+        return RowType(fields)
+
+    elif isinstance(a, ArrayType):
+        return ArrayType(_merge_type(a.element_type, b.element_type,
+                                     name='element in array %s' % name))
+
+    elif isinstance(a, MapType):
+        return MapType(_merge_type(a.key_type, b.key_type, name='key of map %s' % name),
+                       _merge_type(a.value_type, b.value_type, name='value of map %s' % name))
+    else:
+        return a
+
+
+def _infer_schema_from_data(elements, names=None):
+    """
+    Infers schema from list of Row or tuple.
+
+    :param elements: list of Row or tuple
+    :param names: list of column names
+    :return: :class:`RowType`
+    """
+    if not elements:
+        raise ValueError("can not infer schema from empty data set")
+    schema = reduce(_merge_type, (_infer_schema(row, names) for row in elements))
+    if _has_nulltype(schema):
+        raise ValueError("Some column types cannot be determined after inferring")
+    return schema
+
+
+def _need_converter(data_type):
+    if isinstance(data_type, RowType):
+        return True
+    elif isinstance(data_type, ArrayType):
+        return _need_converter(data_type.element_type)
+    elif isinstance(data_type, MapType):
+        return _need_converter(data_type.key_type) or _need_converter(data_type.value_type)
+    elif isinstance(data_type, NullType):
+        return True
+    else:
+        return False
+
+
+def _create_converter(data_type):
+    """
+    Creates a converter to drop the names of fields in obj.
+    """
+    if not _need_converter(data_type):
+        return lambda x: x
+
+    if isinstance(data_type, ArrayType):
+        conv = _create_converter(data_type.element_type)
+        return lambda row: [conv(v) for v in row]
+
+    elif isinstance(data_type, MapType):
+        kconv = _create_converter(data_type.key_type)
+        vconv = _create_converter(data_type.value_type)
+        return lambda row: dict((kconv(k), vconv(v)) for k, v in row.items())
+
+    elif isinstance(data_type, NullType):
+        return lambda x: None
+
+    elif not isinstance(data_type, RowType):
+        return lambda x: x
+
+    # dataType must be RowType
+    names = [f.name for f in data_type.fields]
+    converters = [_create_converter(f.data_type) for f in data_type.fields]
+    convert_fields = any(_need_converter(f.data_type) for f in data_type.fields)
+
+    def convert_row(obj):
+        if obj is None:
+            return
+
+        if isinstance(obj, (tuple, list)):
+            if convert_fields:
+                return tuple(conv(v) for v, conv in zip(obj, converters))
+            else:
+                return tuple(obj)
+
+        if isinstance(obj, dict):
+            d = obj
+        elif hasattr(obj, "__dict__"):  # object
+            d = obj.__dict__
+        else:
+            raise TypeError("Unexpected obj type: %s" % type(obj))
+
+        if convert_fields:
+            return tuple([conv(d.get(name)) for name, conv in zip(names, converters)])
+        else:
+            return tuple([d.get(name) for name in names])
+
+    return convert_row
+
+
+def _to_java_type(data_type):
+    """
+    Converts Python type to Java type.
+    """
+    gateway = get_gateway()
+    return gateway.jvm.org.apache.flink.table.api.Types.fromJson(data_type.json())
 
 Review comment:
   Serializing all types to JSON format can reduce the communication of many JVMs, which is a good idea. But I think the other issue is we should add the `fromJson()`, i.e. add a new dependency in java module, and introducing maintenance costs on the Java side, which I left the text in Java section.  :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services