You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/12/17 06:42:48 UTC

[GitHub] [flink] HuangXingBo commented on a change in pull request #14401: [FLINK-20621][python] Refactor the TypeInformation implementation in Python DataStream API.

HuangXingBo commented on a change in pull request #14401:
URL: https://github.com/apache/flink/pull/14401#discussion_r544789178



##########
File path: flink-python/pyflink/common/typeinfo.py
##########
@@ -91,66 +73,117 @@ def from_internal_type(self, obj):
         return obj
 
 
-class BasicTypeInfo(TypeInformation, ABC):
+class BasicTypes(Enum):

Review comment:
       ```suggestion
   class BasicType(Enum):
   ```

##########
File path: flink-python/pyflink/common/typeinfo.py
##########
@@ -91,66 +73,117 @@ def from_internal_type(self, obj):
         return obj
 
 
-class BasicTypeInfo(TypeInformation, ABC):
+class BasicTypes(Enum):
+    STRING = "String"
+    BYTE = "Byte"
+    BOOLEAN = "Boolean"
+    SHORT = "Short"
+    INT = "Integer"
+    LONG = "Long"
+    FLOAT = "Float"
+    DOUBLE = "Double"
+    CHAR = "Char"
+    BIG_INT = "BigInteger"
+    BIG_DEC = "BigDecimal"
+
+
+class BasicTypeInformation(TypeInformation, ABC):

Review comment:
       ```suggestion
   class BasicTypeInformation(TypeInformation):
   ```

##########
File path: flink-python/pyflink/common/typeinfo.py
##########
@@ -160,227 +193,193 @@ class SqlTimeTypeInfo(TypeInformation, ABC):
 
     @staticmethod
     def DATE():
-        return DateTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.DATE)
+        return DateTypeInformation()
 
     @staticmethod
     def TIME():
-        return TimeTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.TIME)
+        return TimeTypeInformation()
 
     @staticmethod
     def TIMESTAMP():
-        return TimestampTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.TIMESTAMP)
+        return TimestampTypeInformation()
 
 
-class PrimitiveArrayTypeInfo(WrapperTypeInfo, ABC):
+class PrimitiveArrayTypeInformation(TypeInformation, ABC):
     """
     A TypeInformation for arrays of primitive types (int, long, double, ...).
     Supports the creation of dedicated efficient serializers for these types.
     """
 
-    @staticmethod
-    def BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO)
-
-    @staticmethod
-    def BYTE_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
+    def __init__(self, element_type: TypeInformation):
+        self._element_type = element_type
+        super(PrimitiveArrayTypeInformation, self).__init__()
 
-    @staticmethod
-    def SHORT_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO)
-
-    @staticmethod
-    def INT_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO)
+    def get_java_type_info(self) -> JavaObject:
+        JPrimitiveArrayTypeInfo = get_gateway().jvm.org.apache.flink.api.common.typeinfo \
+            .PrimitiveArrayTypeInfo
+        if self._element_type == Types.BOOLEAN():
+            return JPrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.BYTE():
+            return JPrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.SHORT():
+            return JPrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.INT():
+            return JPrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.LONG():
+            return JPrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.FLOAT():
+            return JPrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.DOUBLE():
+            return JPrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.CHAR():
+            return JPrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO
+        else:
+            raise TypeError("Invalid element type for a primitive array.")
 
-    @staticmethod
-    def LONG_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO)
+    def __eq__(self, o) -> bool:
+        if isinstance(o, PrimitiveArrayTypeInformation):
+            return self._element_type == o._element_type
+        return False
 
-    @staticmethod
-    def FLOAT_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO)
+    def __repr__(self) -> str:
+        return "PrimitiveArrayTypeInformation<%s>" % self._element_type
 
-    @staticmethod
-    def DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO)
 
-    @staticmethod
-    def CHAR_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO)
+def is_primitive_array_type_info(type_info: TypeInformation):
+    return isinstance(type_info, PrimitiveArrayTypeInformation)
 
 
-def is_primitive_array_type_info(type_info: TypeInformation):
-    return type_info in {
-        PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO()
-    }
-
-
-class BasicArrayTypeInfo(WrapperTypeInfo, ABC):
+class BasicArrayTypeInformation(TypeInformation, ABC):
     """
     A TypeInformation for arrays of boxed primitive types (Integer, Long, Double, ...).
     Supports the creation of dedicated efficient serializers for these types.
     """
-    @staticmethod
-    def BOOLEAN_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO)
 
-    @staticmethod
-    def BYTE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO)
+    def __init__(self, element_type: TypeInformation):
+        self._element_type = element_type
+        super(BasicArrayTypeInformation, self).__init__()
 
-    @staticmethod
-    def SHORT_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO)
-
-    @staticmethod
-    def INT_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO)
-
-    @staticmethod
-    def LONG_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO)
+    def get_java_type_info(self) -> JavaObject:
+        JBasicArrayTypeInfo = get_gateway().jvm.org.apache.flink.api.common.typeinfo \
+            .BasicArrayTypeInfo
+        if self._element_type == Types.BOOLEAN():
+            return JBasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO
+        elif self._element_type == Types.BYTE():
+            return JBasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.SHORT():
+            return JBasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO
+        elif self._element_type == Types.INT():
+            return JBasicArrayTypeInfo.INT_ARRAY_TYPE_INFO
+        elif self._element_type == Types.LONG():
+            return JBasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO
+        elif self._element_type == Types.FLOAT():
+            return JBasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO
+        elif self._element_type == Types.DOUBLE():
+            return JBasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.CHAR():
+            return JBasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO
+        elif self._element_type == Types.STRING():
+            return JBasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO
+        else:
+            raise TypeError("Invalid element type for a primitive array.")
 
-    @staticmethod
-    def FLOAT_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO)
+    def __eq__(self, o) -> bool:
+        if isinstance(o, BasicArrayTypeInformation):
+            return self._element_type == o._element_type
+        return False
 
-    @staticmethod
-    def DOUBLE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO)
+    def __repr__(self):
+        return "BasicArrayTypeInformation<%s>" % self._element_type
 
-    @staticmethod
-    def CHAR_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO)
 
-    @staticmethod
-    def STRING_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO)
+def is_basic_array_type_info(type_info: TypeInformation):
+    return isinstance(type_info, BasicArrayTypeInformation)
 
 
-def is_basic_array_type_info(type_info: TypeInformation):
-    return type_info in {
-        BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO()
-    }
-
-
-class PickledBytesTypeInfo(WrapperTypeInfo, ABC):
+class PickledBytesTypeInfo(TypeInformation, ABC):

Review comment:
       ```suggestion
   class PickledBytesTypeInfo(TypeInformation):
   ```

##########
File path: flink-python/pyflink/common/typeinfo.py
##########
@@ -48,30 +48,12 @@ class acts as the tool to generate serializers and comparators, and to perform s
     nested types).
     """
 
-
-class WrapperTypeInfo(TypeInformation):
-    """
-    A wrapper class for java TypeInformation Objects.
-    """
-
-    def __init__(self, j_typeinfo):
-        self._j_typeinfo = j_typeinfo
+    def __init__(self):
+        self._j_typeinfo = None
 
     def get_java_type_info(self) -> JavaObject:

Review comment:
       Since the subclasses of `TypeInformation` will always override this method, I think this method can be declared as an abstract method.

##########
File path: flink-python/pyflink/common/typeinfo.py
##########
@@ -160,227 +193,193 @@ class SqlTimeTypeInfo(TypeInformation, ABC):
 
     @staticmethod
     def DATE():
-        return DateTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.DATE)
+        return DateTypeInformation()
 
     @staticmethod
     def TIME():
-        return TimeTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.TIME)
+        return TimeTypeInformation()
 
     @staticmethod
     def TIMESTAMP():
-        return TimestampTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.TIMESTAMP)
+        return TimestampTypeInformation()
 
 
-class PrimitiveArrayTypeInfo(WrapperTypeInfo, ABC):
+class PrimitiveArrayTypeInformation(TypeInformation, ABC):

Review comment:
       ```suggestion
   class PrimitiveArrayTypeInformation(TypeInformation):
   ```

##########
File path: flink-python/pyflink/common/typeinfo.py
##########
@@ -160,227 +193,193 @@ class SqlTimeTypeInfo(TypeInformation, ABC):
 
     @staticmethod
     def DATE():
-        return DateTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.DATE)
+        return DateTypeInformation()
 
     @staticmethod
     def TIME():
-        return TimeTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.TIME)
+        return TimeTypeInformation()
 
     @staticmethod
     def TIMESTAMP():
-        return TimestampTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.TIMESTAMP)
+        return TimestampTypeInformation()
 
 
-class PrimitiveArrayTypeInfo(WrapperTypeInfo, ABC):
+class PrimitiveArrayTypeInformation(TypeInformation, ABC):
     """
     A TypeInformation for arrays of primitive types (int, long, double, ...).
     Supports the creation of dedicated efficient serializers for these types.
     """
 
-    @staticmethod
-    def BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO)
-
-    @staticmethod
-    def BYTE_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
+    def __init__(self, element_type: TypeInformation):
+        self._element_type = element_type
+        super(PrimitiveArrayTypeInformation, self).__init__()
 
-    @staticmethod
-    def SHORT_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO)
-
-    @staticmethod
-    def INT_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO)
+    def get_java_type_info(self) -> JavaObject:
+        JPrimitiveArrayTypeInfo = get_gateway().jvm.org.apache.flink.api.common.typeinfo \
+            .PrimitiveArrayTypeInfo
+        if self._element_type == Types.BOOLEAN():
+            return JPrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.BYTE():
+            return JPrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.SHORT():
+            return JPrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.INT():
+            return JPrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.LONG():
+            return JPrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.FLOAT():
+            return JPrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.DOUBLE():
+            return JPrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.CHAR():
+            return JPrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO
+        else:
+            raise TypeError("Invalid element type for a primitive array.")
 
-    @staticmethod
-    def LONG_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO)
+    def __eq__(self, o) -> bool:
+        if isinstance(o, PrimitiveArrayTypeInformation):
+            return self._element_type == o._element_type
+        return False
 
-    @staticmethod
-    def FLOAT_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO)
+    def __repr__(self) -> str:
+        return "PrimitiveArrayTypeInformation<%s>" % self._element_type
 
-    @staticmethod
-    def DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO)
 
-    @staticmethod
-    def CHAR_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO)
+def is_primitive_array_type_info(type_info: TypeInformation):
+    return isinstance(type_info, PrimitiveArrayTypeInformation)
 
 
-def is_primitive_array_type_info(type_info: TypeInformation):
-    return type_info in {
-        PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO()
-    }
-
-
-class BasicArrayTypeInfo(WrapperTypeInfo, ABC):
+class BasicArrayTypeInformation(TypeInformation, ABC):
     """
     A TypeInformation for arrays of boxed primitive types (Integer, Long, Double, ...).
     Supports the creation of dedicated efficient serializers for these types.
     """
-    @staticmethod
-    def BOOLEAN_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO)
 
-    @staticmethod
-    def BYTE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO)
+    def __init__(self, element_type: TypeInformation):
+        self._element_type = element_type
+        super(BasicArrayTypeInformation, self).__init__()
 
-    @staticmethod
-    def SHORT_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO)
-
-    @staticmethod
-    def INT_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO)
-
-    @staticmethod
-    def LONG_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO)
+    def get_java_type_info(self) -> JavaObject:
+        JBasicArrayTypeInfo = get_gateway().jvm.org.apache.flink.api.common.typeinfo \
+            .BasicArrayTypeInfo
+        if self._element_type == Types.BOOLEAN():
+            return JBasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO
+        elif self._element_type == Types.BYTE():
+            return JBasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.SHORT():
+            return JBasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO
+        elif self._element_type == Types.INT():
+            return JBasicArrayTypeInfo.INT_ARRAY_TYPE_INFO
+        elif self._element_type == Types.LONG():
+            return JBasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO
+        elif self._element_type == Types.FLOAT():
+            return JBasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO
+        elif self._element_type == Types.DOUBLE():
+            return JBasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.CHAR():
+            return JBasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO
+        elif self._element_type == Types.STRING():
+            return JBasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO
+        else:
+            raise TypeError("Invalid element type for a primitive array.")
 
-    @staticmethod
-    def FLOAT_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO)
+    def __eq__(self, o) -> bool:
+        if isinstance(o, BasicArrayTypeInformation):
+            return self._element_type == o._element_type
+        return False
 
-    @staticmethod
-    def DOUBLE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO)
+    def __repr__(self):
+        return "BasicArrayTypeInformation<%s>" % self._element_type
 
-    @staticmethod
-    def CHAR_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO)
 
-    @staticmethod
-    def STRING_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO)
+def is_basic_array_type_info(type_info: TypeInformation):

Review comment:
       I think we can remove this method

##########
File path: flink-python/pyflink/common/typeinfo.py
##########
@@ -160,227 +193,193 @@ class SqlTimeTypeInfo(TypeInformation, ABC):
 
     @staticmethod
     def DATE():
-        return DateTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.DATE)
+        return DateTypeInformation()
 
     @staticmethod
     def TIME():
-        return TimeTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.TIME)
+        return TimeTypeInformation()
 
     @staticmethod
     def TIMESTAMP():
-        return TimestampTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.TIMESTAMP)
+        return TimestampTypeInformation()
 
 
-class PrimitiveArrayTypeInfo(WrapperTypeInfo, ABC):
+class PrimitiveArrayTypeInformation(TypeInformation, ABC):
     """
     A TypeInformation for arrays of primitive types (int, long, double, ...).
     Supports the creation of dedicated efficient serializers for these types.
     """
 
-    @staticmethod
-    def BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO)
-
-    @staticmethod
-    def BYTE_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
+    def __init__(self, element_type: TypeInformation):
+        self._element_type = element_type
+        super(PrimitiveArrayTypeInformation, self).__init__()
 
-    @staticmethod
-    def SHORT_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO)
-
-    @staticmethod
-    def INT_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO)
+    def get_java_type_info(self) -> JavaObject:
+        JPrimitiveArrayTypeInfo = get_gateway().jvm.org.apache.flink.api.common.typeinfo \
+            .PrimitiveArrayTypeInfo
+        if self._element_type == Types.BOOLEAN():
+            return JPrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.BYTE():
+            return JPrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.SHORT():
+            return JPrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.INT():
+            return JPrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.LONG():
+            return JPrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.FLOAT():
+            return JPrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.DOUBLE():
+            return JPrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.CHAR():
+            return JPrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO
+        else:
+            raise TypeError("Invalid element type for a primitive array.")
 
-    @staticmethod
-    def LONG_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO)
+    def __eq__(self, o) -> bool:
+        if isinstance(o, PrimitiveArrayTypeInformation):
+            return self._element_type == o._element_type
+        return False
 
-    @staticmethod
-    def FLOAT_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO)
+    def __repr__(self) -> str:
+        return "PrimitiveArrayTypeInformation<%s>" % self._element_type
 
-    @staticmethod
-    def DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO)
 
-    @staticmethod
-    def CHAR_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO)
+def is_primitive_array_type_info(type_info: TypeInformation):

Review comment:
       I think we can remove this method.

##########
File path: flink-python/pyflink/common/typeinfo.py
##########
@@ -160,227 +193,193 @@ class SqlTimeTypeInfo(TypeInformation, ABC):
 
     @staticmethod
     def DATE():
-        return DateTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.DATE)
+        return DateTypeInformation()
 
     @staticmethod
     def TIME():
-        return TimeTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.TIME)
+        return TimeTypeInformation()
 
     @staticmethod
     def TIMESTAMP():
-        return TimestampTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.TIMESTAMP)
+        return TimestampTypeInformation()
 
 
-class PrimitiveArrayTypeInfo(WrapperTypeInfo, ABC):
+class PrimitiveArrayTypeInformation(TypeInformation, ABC):
     """
     A TypeInformation for arrays of primitive types (int, long, double, ...).
     Supports the creation of dedicated efficient serializers for these types.
     """
 
-    @staticmethod
-    def BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO)
-
-    @staticmethod
-    def BYTE_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
+    def __init__(self, element_type: TypeInformation):
+        self._element_type = element_type
+        super(PrimitiveArrayTypeInformation, self).__init__()
 
-    @staticmethod
-    def SHORT_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO)
-
-    @staticmethod
-    def INT_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO)
+    def get_java_type_info(self) -> JavaObject:
+        JPrimitiveArrayTypeInfo = get_gateway().jvm.org.apache.flink.api.common.typeinfo \
+            .PrimitiveArrayTypeInfo
+        if self._element_type == Types.BOOLEAN():
+            return JPrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.BYTE():
+            return JPrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.SHORT():
+            return JPrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.INT():
+            return JPrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.LONG():
+            return JPrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.FLOAT():
+            return JPrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.DOUBLE():
+            return JPrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.CHAR():
+            return JPrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO
+        else:
+            raise TypeError("Invalid element type for a primitive array.")
 
-    @staticmethod
-    def LONG_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO)
+    def __eq__(self, o) -> bool:
+        if isinstance(o, PrimitiveArrayTypeInformation):
+            return self._element_type == o._element_type
+        return False
 
-    @staticmethod
-    def FLOAT_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO)
+    def __repr__(self) -> str:
+        return "PrimitiveArrayTypeInformation<%s>" % self._element_type
 
-    @staticmethod
-    def DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO)
 
-    @staticmethod
-    def CHAR_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO)
+def is_primitive_array_type_info(type_info: TypeInformation):
+    return isinstance(type_info, PrimitiveArrayTypeInformation)
 
 
-def is_primitive_array_type_info(type_info: TypeInformation):
-    return type_info in {
-        PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO()
-    }
-
-
-class BasicArrayTypeInfo(WrapperTypeInfo, ABC):
+class BasicArrayTypeInformation(TypeInformation, ABC):
     """
     A TypeInformation for arrays of boxed primitive types (Integer, Long, Double, ...).
     Supports the creation of dedicated efficient serializers for these types.
     """
-    @staticmethod
-    def BOOLEAN_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO)
 
-    @staticmethod
-    def BYTE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO)
+    def __init__(self, element_type: TypeInformation):
+        self._element_type = element_type
+        super(BasicArrayTypeInformation, self).__init__()
 
-    @staticmethod
-    def SHORT_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO)
-
-    @staticmethod
-    def INT_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO)
-
-    @staticmethod
-    def LONG_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO)
+    def get_java_type_info(self) -> JavaObject:
+        JBasicArrayTypeInfo = get_gateway().jvm.org.apache.flink.api.common.typeinfo \
+            .BasicArrayTypeInfo
+        if self._element_type == Types.BOOLEAN():
+            return JBasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO
+        elif self._element_type == Types.BYTE():
+            return JBasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.SHORT():
+            return JBasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO
+        elif self._element_type == Types.INT():
+            return JBasicArrayTypeInfo.INT_ARRAY_TYPE_INFO
+        elif self._element_type == Types.LONG():
+            return JBasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO
+        elif self._element_type == Types.FLOAT():
+            return JBasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO
+        elif self._element_type == Types.DOUBLE():
+            return JBasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.CHAR():
+            return JBasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO
+        elif self._element_type == Types.STRING():
+            return JBasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO
+        else:
+            raise TypeError("Invalid element type for a primitive array.")
 
-    @staticmethod
-    def FLOAT_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO)
+    def __eq__(self, o) -> bool:
+        if isinstance(o, BasicArrayTypeInformation):
+            return self._element_type == o._element_type
+        return False
 
-    @staticmethod
-    def DOUBLE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO)
+    def __repr__(self):
+        return "BasicArrayTypeInformation<%s>" % self._element_type
 
-    @staticmethod
-    def CHAR_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO)
 
-    @staticmethod
-    def STRING_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO)
+def is_basic_array_type_info(type_info: TypeInformation):
+    return isinstance(type_info, BasicArrayTypeInformation)
 
 
-def is_basic_array_type_info(type_info: TypeInformation):
-    return type_info in {
-        BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO()
-    }
-
-
-class PickledBytesTypeInfo(WrapperTypeInfo, ABC):
+class PickledBytesTypeInfo(TypeInformation, ABC):
     """
     A PickledBytesTypeInfo indicates the data is a primitive byte array generated by pickle
     serializer.
     """
 
     @staticmethod
     def PICKLED_BYTE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(get_gateway().jvm.org.apache.flink.streaming.api.typeinfo.python
-                               .PickledByteArrayTypeInfo.PICKLED_BYTE_ARRAY_TYPE_INFO)
+        return PickledBytesTypeInfo()

Review comment:
       Why do we need to provide this static factory method without parameters? I guess you want to implement a singleton, but doing so will not achieve the effect
   

##########
File path: flink-python/pyflink/common/typeinfo.py
##########
@@ -160,227 +193,193 @@ class SqlTimeTypeInfo(TypeInformation, ABC):
 
     @staticmethod
     def DATE():
-        return DateTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.DATE)
+        return DateTypeInformation()
 
     @staticmethod
     def TIME():
-        return TimeTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.TIME)
+        return TimeTypeInformation()
 
     @staticmethod
     def TIMESTAMP():
-        return TimestampTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.TIMESTAMP)
+        return TimestampTypeInformation()
 
 
-class PrimitiveArrayTypeInfo(WrapperTypeInfo, ABC):
+class PrimitiveArrayTypeInformation(TypeInformation, ABC):
     """
     A TypeInformation for arrays of primitive types (int, long, double, ...).
     Supports the creation of dedicated efficient serializers for these types.
     """
 
-    @staticmethod
-    def BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO)
-
-    @staticmethod
-    def BYTE_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
+    def __init__(self, element_type: TypeInformation):
+        self._element_type = element_type
+        super(PrimitiveArrayTypeInformation, self).__init__()
 
-    @staticmethod
-    def SHORT_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO)
-
-    @staticmethod
-    def INT_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO)
+    def get_java_type_info(self) -> JavaObject:
+        JPrimitiveArrayTypeInfo = get_gateway().jvm.org.apache.flink.api.common.typeinfo \
+            .PrimitiveArrayTypeInfo
+        if self._element_type == Types.BOOLEAN():
+            return JPrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.BYTE():
+            return JPrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.SHORT():
+            return JPrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.INT():
+            return JPrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.LONG():
+            return JPrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.FLOAT():
+            return JPrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.DOUBLE():
+            return JPrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.CHAR():
+            return JPrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO
+        else:
+            raise TypeError("Invalid element type for a primitive array.")
 
-    @staticmethod
-    def LONG_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO)
+    def __eq__(self, o) -> bool:
+        if isinstance(o, PrimitiveArrayTypeInformation):
+            return self._element_type == o._element_type
+        return False
 
-    @staticmethod
-    def FLOAT_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO)
+    def __repr__(self) -> str:
+        return "PrimitiveArrayTypeInformation<%s>" % self._element_type
 
-    @staticmethod
-    def DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO)
 
-    @staticmethod
-    def CHAR_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO)
+def is_primitive_array_type_info(type_info: TypeInformation):
+    return isinstance(type_info, PrimitiveArrayTypeInformation)
 
 
-def is_primitive_array_type_info(type_info: TypeInformation):
-    return type_info in {
-        PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO()
-    }
-
-
-class BasicArrayTypeInfo(WrapperTypeInfo, ABC):
+class BasicArrayTypeInformation(TypeInformation, ABC):

Review comment:
       ```suggestion
   class BasicArrayTypeInformation(TypeInformation):
   ```

##########
File path: flink-python/pyflink/common/typeinfo.py
##########
@@ -431,45 +430,47 @@ def from_internal_type(self, obj):
         return tuple(values)
 
 
-class TupleTypeInfo(WrapperTypeInfo):
+class TupleTypeInfo(TypeInformation):
     """
     TypeInformation for Tuple.
     """
 
     def __init__(self, types: List[TypeInformation]):
         self.types = types
+        super(TupleTypeInfo, self).__init__()
+
+    def get_field_types(self) -> List[TypeInformation]:
+        return self.types
+
+    def get_java_type_info(self) -> JavaObject:
         j_types_array = get_gateway().new_array(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo.TypeInformation, len(types))
+            get_gateway().jvm.org.apache.flink.api.common.typeinfo.TypeInformation, len(self.types))
 
-        for i in range(len(types)):
-            field_type = types[i]
-            if isinstance(field_type, WrapperTypeInfo):
+        for i in range(len(self.types)):
+            field_type = self.types[i]
+            if isinstance(field_type, TypeInformation):
                 j_types_array[i] = field_type.get_java_type_info()
 
-        j_typeinfo = get_gateway().jvm \
+        self._j_typeinfo = get_gateway().jvm \
             .org.apache.flink.api.java.typeutils.TupleTypeInfo(j_types_array)
-        super(TupleTypeInfo, self).__init__(j_typeinfo=j_typeinfo)
-
-    def get_field_types(self) -> List[TypeInformation]:
-        return self.types
+        return self._j_typeinfo
 
     def __eq__(self, other) -> bool:
-        return self._j_typeinfo.equals(other._j_typeinfo)
-
-    def __hash__(self) -> int:
-        return self._j_typeinfo.hashCode()
+        if isinstance(other, TupleTypeInfo):
+            return self.types == other.types
+        return False
 
     def __str__(self) -> str:

Review comment:
       I think we need to override the `__repr__` method.
   
   

##########
File path: flink-python/pyflink/common/typeinfo.py
##########
@@ -407,12 +406,12 @@ def to_internal_type(self, obj):
                 raise ValueError("Unexpected tuple %r with RowTypeInfo" % obj)
         else:
             if isinstance(obj, dict):
-                return tuple(obj.get(n) for n in self._j_typeinfo.getFieldNames())
+                return tuple(obj.get(n) for n in self.field_names)

Review comment:
       ```suggestion
                   return tuple(obj[n] for n in self.field_names)
   ```

##########
File path: flink-python/pyflink/common/typeinfo.py
##########
@@ -407,12 +406,12 @@ def to_internal_type(self, obj):
                 raise ValueError("Unexpected tuple %r with RowTypeInfo" % obj)
         else:
             if isinstance(obj, dict):
-                return tuple(obj.get(n) for n in self._j_typeinfo.getFieldNames())
+                return tuple(obj.get(n) for n in self.field_names)
             elif isinstance(obj, (list, tuple)):
                 return tuple(obj)
             elif hasattr(obj, "__dict__"):
                 d = obj.__dict__
-                return tuple(d.get(n) for n in self._j_typeinfo.getFieldNames())
+                return tuple(d.get(n) for n in self.field_names)

Review comment:
       ```suggestion
                   return tuple(d[n] for n in self.field_names)
   ```

##########
File path: flink-python/pyflink/common/typeinfo.py
##########
@@ -160,227 +193,193 @@ class SqlTimeTypeInfo(TypeInformation, ABC):
 
     @staticmethod
     def DATE():
-        return DateTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.DATE)
+        return DateTypeInformation()
 
     @staticmethod
     def TIME():
-        return TimeTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.TIME)
+        return TimeTypeInformation()
 
     @staticmethod
     def TIMESTAMP():
-        return TimestampTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.TIMESTAMP)
+        return TimestampTypeInformation()
 
 
-class PrimitiveArrayTypeInfo(WrapperTypeInfo, ABC):
+class PrimitiveArrayTypeInformation(TypeInformation, ABC):
     """
     A TypeInformation for arrays of primitive types (int, long, double, ...).
     Supports the creation of dedicated efficient serializers for these types.
     """
 
-    @staticmethod
-    def BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO)
-
-    @staticmethod
-    def BYTE_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
+    def __init__(self, element_type: TypeInformation):
+        self._element_type = element_type
+        super(PrimitiveArrayTypeInformation, self).__init__()
 
-    @staticmethod
-    def SHORT_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO)
-
-    @staticmethod
-    def INT_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO)
+    def get_java_type_info(self) -> JavaObject:
+        JPrimitiveArrayTypeInfo = get_gateway().jvm.org.apache.flink.api.common.typeinfo \
+            .PrimitiveArrayTypeInfo
+        if self._element_type == Types.BOOLEAN():
+            return JPrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.BYTE():
+            return JPrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.SHORT():
+            return JPrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.INT():
+            return JPrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.LONG():
+            return JPrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.FLOAT():
+            return JPrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.DOUBLE():
+            return JPrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.CHAR():
+            return JPrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO
+        else:
+            raise TypeError("Invalid element type for a primitive array.")
 
-    @staticmethod
-    def LONG_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO)
+    def __eq__(self, o) -> bool:
+        if isinstance(o, PrimitiveArrayTypeInformation):
+            return self._element_type == o._element_type
+        return False
 
-    @staticmethod
-    def FLOAT_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO)
+    def __repr__(self) -> str:
+        return "PrimitiveArrayTypeInformation<%s>" % self._element_type
 
-    @staticmethod
-    def DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO)
 
-    @staticmethod
-    def CHAR_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO)
+def is_primitive_array_type_info(type_info: TypeInformation):
+    return isinstance(type_info, PrimitiveArrayTypeInformation)
 
 
-def is_primitive_array_type_info(type_info: TypeInformation):
-    return type_info in {
-        PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO()
-    }
-
-
-class BasicArrayTypeInfo(WrapperTypeInfo, ABC):
+class BasicArrayTypeInformation(TypeInformation, ABC):
     """
     A TypeInformation for arrays of boxed primitive types (Integer, Long, Double, ...).
     Supports the creation of dedicated efficient serializers for these types.
     """
-    @staticmethod
-    def BOOLEAN_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO)
 
-    @staticmethod
-    def BYTE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO)
+    def __init__(self, element_type: TypeInformation):
+        self._element_type = element_type
+        super(BasicArrayTypeInformation, self).__init__()
 
-    @staticmethod
-    def SHORT_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO)
-
-    @staticmethod
-    def INT_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO)
-
-    @staticmethod
-    def LONG_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO)
+    def get_java_type_info(self) -> JavaObject:
+        JBasicArrayTypeInfo = get_gateway().jvm.org.apache.flink.api.common.typeinfo \
+            .BasicArrayTypeInfo
+        if self._element_type == Types.BOOLEAN():
+            return JBasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO
+        elif self._element_type == Types.BYTE():
+            return JBasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.SHORT():
+            return JBasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO
+        elif self._element_type == Types.INT():
+            return JBasicArrayTypeInfo.INT_ARRAY_TYPE_INFO
+        elif self._element_type == Types.LONG():
+            return JBasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO
+        elif self._element_type == Types.FLOAT():
+            return JBasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO
+        elif self._element_type == Types.DOUBLE():
+            return JBasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.CHAR():
+            return JBasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO
+        elif self._element_type == Types.STRING():
+            return JBasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO
+        else:
+            raise TypeError("Invalid element type for a primitive array.")
 
-    @staticmethod
-    def FLOAT_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO)
+    def __eq__(self, o) -> bool:
+        if isinstance(o, BasicArrayTypeInformation):
+            return self._element_type == o._element_type
+        return False
 
-    @staticmethod
-    def DOUBLE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO)
+    def __repr__(self):
+        return "BasicArrayTypeInformation<%s>" % self._element_type
 
-    @staticmethod
-    def CHAR_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO)
 
-    @staticmethod
-    def STRING_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO)
+def is_basic_array_type_info(type_info: TypeInformation):
+    return isinstance(type_info, BasicArrayTypeInformation)
 
 
-def is_basic_array_type_info(type_info: TypeInformation):
-    return type_info in {
-        BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO()
-    }
-
-
-class PickledBytesTypeInfo(WrapperTypeInfo, ABC):
+class PickledBytesTypeInfo(TypeInformation, ABC):
     """
     A PickledBytesTypeInfo indicates the data is a primitive byte array generated by pickle
     serializer.
     """
 
     @staticmethod
     def PICKLED_BYTE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(get_gateway().jvm.org.apache.flink.streaming.api.typeinfo.python
-                               .PickledByteArrayTypeInfo.PICKLED_BYTE_ARRAY_TYPE_INFO)
+        return PickledBytesTypeInfo()
+
+    def get_java_type_info(self) -> JavaObject:
+        self._j_typeinfo = get_gateway().jvm.org.apache.flink.streaming.api.typeinfo.python\
+            .PickledByteArrayTypeInfo.PICKLED_BYTE_ARRAY_TYPE_INFO
+        return self._j_typeinfo
 
+    def __eq__(self, o: object) -> bool:
+        return isinstance(o, PickledBytesTypeInfo)
 
-class RowTypeInfo(WrapperTypeInfo):
+    def __repr__(self):
+        return "PickledByteArrayTypeInformation"
+
+
+class RowTypeInfo(TypeInformation):
     """
     TypeInformation for Row.
     """
 
     def __init__(self, types: List[TypeInformation], field_names: List[str] = None):
         self.types = types
         self.field_names = field_names
-        self.j_types_array = get_gateway().new_array(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo.TypeInformation, len(types))
-        for i in range(len(types)):
-            wrapper_typeinfo = types[i]
-            if isinstance(wrapper_typeinfo, WrapperTypeInfo):
-                self.j_types_array[i] = wrapper_typeinfo.get_java_type_info()
-
-        if field_names is None:
-            self._j_typeinfo = get_gateway().jvm.org.apache.flink.api.java.typeutils.RowTypeInfo(
-                self.j_types_array)
-        else:
-            j_names_array = get_gateway().new_array(get_gateway().jvm.java.lang.String,
-                                                    len(field_names))
-            for i in range(len(field_names)):
-                j_names_array[i] = field_names[i]
-            self._j_typeinfo = get_gateway().jvm.org.apache.flink.api.java.typeutils.RowTypeInfo(
-                self.j_types_array, j_names_array)
-        self._need_conversion = [f.need_conversion() if isinstance(f, WrapperTypeInfo) else None
-                                 for f in types]
+        self._need_conversion = [f.need_conversion() if isinstance(f, TypeInformation) else None
+                                 for f in self.types]
         self._need_serialize_any_field = any(self._need_conversion)
-        super(RowTypeInfo, self).__init__(self._j_typeinfo)
+        super(RowTypeInfo, self).__init__()
 
     def get_field_names(self) -> List[str]:
-        j_field_names = self._j_typeinfo.getFieldNames()
-        field_names = [name for name in j_field_names]
-        return field_names
+        if not self.field_names:
+            j_field_names = self.get_java_type_info().getFieldNames()
+            self.field_names = [name for name in j_field_names]
+        return self.field_names
 
     def get_field_index(self, field_name: str) -> int:
-        return self._j_typeinfo.getFieldIndex(field_name)
+        if self.field_names:
+            return self.field_names.index(field_name)
+        return -1
 
     def get_field_types(self) -> List[TypeInformation]:
         return self.types
 
-    def __eq__(self, other) -> bool:
-        return self._j_typeinfo.equals(other._j_typeinfo)
+    def get_java_type_info(self) -> JavaObject:
+        if not self._j_typeinfo:
+            j_types_array = get_gateway()\
+                .new_array(get_gateway().jvm.org.apache.flink.api.common.typeinfo.TypeInformation,
+                           len(self.types))
+            for i in range(len(self.types)):
+                wrapper_typeinfo = self.types[i]
+                if isinstance(wrapper_typeinfo, TypeInformation):
+                    j_types_array[i] = wrapper_typeinfo.get_java_type_info()
+
+            if self.field_names is None:
+                self._j_typeinfo = get_gateway().jvm\
+                    .org.apache.flink.api.java.typeutils.RowTypeInfo(j_types_array)
+            else:
+                j_names_array = get_gateway().new_array(get_gateway().jvm.java.lang.String,
+                                                        len(self.field_names))
+                for i in range(len(self.field_names)):
+                    j_names_array[i] = self.field_names[i]
+                self._j_typeinfo = get_gateway().jvm\
+                    .org.apache.flink.api.java.typeutils.RowTypeInfo(j_types_array, j_names_array)
+        return self._j_typeinfo
 
-    def __hash__(self) -> int:
-        return self._j_typeinfo.hashCode()
+    def __eq__(self, other) -> bool:
+        if isinstance(other, RowTypeInfo):
+            return self.types == other.types
+        return False
 
     def __str__(self) -> str:

Review comment:
       I think we need to override the `__repr__` method.

##########
File path: flink-python/pyflink/common/typeinfo.py
##########
@@ -160,227 +193,193 @@ class SqlTimeTypeInfo(TypeInformation, ABC):
 
     @staticmethod
     def DATE():
-        return DateTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.DATE)
+        return DateTypeInformation()
 
     @staticmethod
     def TIME():
-        return TimeTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.TIME)
+        return TimeTypeInformation()
 
     @staticmethod
     def TIMESTAMP():
-        return TimestampTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.TIMESTAMP)
+        return TimestampTypeInformation()
 
 
-class PrimitiveArrayTypeInfo(WrapperTypeInfo, ABC):
+class PrimitiveArrayTypeInformation(TypeInformation, ABC):
     """
     A TypeInformation for arrays of primitive types (int, long, double, ...).
     Supports the creation of dedicated efficient serializers for these types.
     """
 
-    @staticmethod
-    def BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO)
-
-    @staticmethod
-    def BYTE_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
+    def __init__(self, element_type: TypeInformation):
+        self._element_type = element_type
+        super(PrimitiveArrayTypeInformation, self).__init__()
 
-    @staticmethod
-    def SHORT_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO)
-
-    @staticmethod
-    def INT_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO)
+    def get_java_type_info(self) -> JavaObject:
+        JPrimitiveArrayTypeInfo = get_gateway().jvm.org.apache.flink.api.common.typeinfo \
+            .PrimitiveArrayTypeInfo
+        if self._element_type == Types.BOOLEAN():
+            return JPrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.BYTE():
+            return JPrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.SHORT():
+            return JPrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.INT():
+            return JPrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.LONG():
+            return JPrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.FLOAT():
+            return JPrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.DOUBLE():
+            return JPrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.CHAR():
+            return JPrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO
+        else:
+            raise TypeError("Invalid element type for a primitive array.")
 
-    @staticmethod
-    def LONG_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO)
+    def __eq__(self, o) -> bool:
+        if isinstance(o, PrimitiveArrayTypeInformation):
+            return self._element_type == o._element_type
+        return False
 
-    @staticmethod
-    def FLOAT_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO)
+    def __repr__(self) -> str:
+        return "PrimitiveArrayTypeInformation<%s>" % self._element_type
 
-    @staticmethod
-    def DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO)
 
-    @staticmethod
-    def CHAR_PRIMITIVE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO)
+def is_primitive_array_type_info(type_info: TypeInformation):
+    return isinstance(type_info, PrimitiveArrayTypeInformation)
 
 
-def is_primitive_array_type_info(type_info: TypeInformation):
-    return type_info in {
-        PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO(),
-        PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO()
-    }
-
-
-class BasicArrayTypeInfo(WrapperTypeInfo, ABC):
+class BasicArrayTypeInformation(TypeInformation, ABC):
     """
     A TypeInformation for arrays of boxed primitive types (Integer, Long, Double, ...).
     Supports the creation of dedicated efficient serializers for these types.
     """
-    @staticmethod
-    def BOOLEAN_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO)
 
-    @staticmethod
-    def BYTE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO)
+    def __init__(self, element_type: TypeInformation):
+        self._element_type = element_type
+        super(BasicArrayTypeInformation, self).__init__()
 
-    @staticmethod
-    def SHORT_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO)
-
-    @staticmethod
-    def INT_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO)
-
-    @staticmethod
-    def LONG_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO)
+    def get_java_type_info(self) -> JavaObject:
+        JBasicArrayTypeInfo = get_gateway().jvm.org.apache.flink.api.common.typeinfo \
+            .BasicArrayTypeInfo
+        if self._element_type == Types.BOOLEAN():
+            return JBasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO
+        elif self._element_type == Types.BYTE():
+            return JBasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.SHORT():
+            return JBasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO
+        elif self._element_type == Types.INT():
+            return JBasicArrayTypeInfo.INT_ARRAY_TYPE_INFO
+        elif self._element_type == Types.LONG():
+            return JBasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO
+        elif self._element_type == Types.FLOAT():
+            return JBasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO
+        elif self._element_type == Types.DOUBLE():
+            return JBasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO
+        elif self._element_type == Types.CHAR():
+            return JBasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO
+        elif self._element_type == Types.STRING():
+            return JBasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO
+        else:
+            raise TypeError("Invalid element type for a primitive array.")
 
-    @staticmethod
-    def FLOAT_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO)
+    def __eq__(self, o) -> bool:
+        if isinstance(o, BasicArrayTypeInformation):
+            return self._element_type == o._element_type
+        return False
 
-    @staticmethod
-    def DOUBLE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO)
+    def __repr__(self):
+        return "BasicArrayTypeInformation<%s>" % self._element_type
 
-    @staticmethod
-    def CHAR_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO)
 
-    @staticmethod
-    def STRING_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo
-            .BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO)
+def is_basic_array_type_info(type_info: TypeInformation):
+    return isinstance(type_info, BasicArrayTypeInformation)
 
 
-def is_basic_array_type_info(type_info: TypeInformation):
-    return type_info in {
-        BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO(),
-        BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO()
-    }
-
-
-class PickledBytesTypeInfo(WrapperTypeInfo, ABC):
+class PickledBytesTypeInfo(TypeInformation, ABC):
     """
     A PickledBytesTypeInfo indicates the data is a primitive byte array generated by pickle
     serializer.
     """
 
     @staticmethod
     def PICKLED_BYTE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(get_gateway().jvm.org.apache.flink.streaming.api.typeinfo.python
-                               .PickledByteArrayTypeInfo.PICKLED_BYTE_ARRAY_TYPE_INFO)
+        return PickledBytesTypeInfo()
+
+    def get_java_type_info(self) -> JavaObject:
+        self._j_typeinfo = get_gateway().jvm.org.apache.flink.streaming.api.typeinfo.python\
+            .PickledByteArrayTypeInfo.PICKLED_BYTE_ARRAY_TYPE_INFO
+        return self._j_typeinfo
 
+    def __eq__(self, o: object) -> bool:
+        return isinstance(o, PickledBytesTypeInfo)
 
-class RowTypeInfo(WrapperTypeInfo):
+    def __repr__(self):
+        return "PickledByteArrayTypeInformation"
+
+
+class RowTypeInfo(TypeInformation):
     """
     TypeInformation for Row.
     """
 
     def __init__(self, types: List[TypeInformation], field_names: List[str] = None):
         self.types = types
         self.field_names = field_names
-        self.j_types_array = get_gateway().new_array(
-            get_gateway().jvm.org.apache.flink.api.common.typeinfo.TypeInformation, len(types))
-        for i in range(len(types)):
-            wrapper_typeinfo = types[i]
-            if isinstance(wrapper_typeinfo, WrapperTypeInfo):
-                self.j_types_array[i] = wrapper_typeinfo.get_java_type_info()
-
-        if field_names is None:
-            self._j_typeinfo = get_gateway().jvm.org.apache.flink.api.java.typeutils.RowTypeInfo(
-                self.j_types_array)
-        else:
-            j_names_array = get_gateway().new_array(get_gateway().jvm.java.lang.String,
-                                                    len(field_names))
-            for i in range(len(field_names)):
-                j_names_array[i] = field_names[i]
-            self._j_typeinfo = get_gateway().jvm.org.apache.flink.api.java.typeutils.RowTypeInfo(
-                self.j_types_array, j_names_array)
-        self._need_conversion = [f.need_conversion() if isinstance(f, WrapperTypeInfo) else None
-                                 for f in types]
+        self._need_conversion = [f.need_conversion() if isinstance(f, TypeInformation) else None
+                                 for f in self.types]
         self._need_serialize_any_field = any(self._need_conversion)
-        super(RowTypeInfo, self).__init__(self._j_typeinfo)
+        super(RowTypeInfo, self).__init__()
 
     def get_field_names(self) -> List[str]:
-        j_field_names = self._j_typeinfo.getFieldNames()
-        field_names = [name for name in j_field_names]
-        return field_names
+        if not self.field_names:
+            j_field_names = self.get_java_type_info().getFieldNames()
+            self.field_names = [name for name in j_field_names]
+        return self.field_names
 
     def get_field_index(self, field_name: str) -> int:
-        return self._j_typeinfo.getFieldIndex(field_name)
+        if self.field_names:
+            return self.field_names.index(field_name)
+        return -1
 
     def get_field_types(self) -> List[TypeInformation]:
         return self.types
 
-    def __eq__(self, other) -> bool:
-        return self._j_typeinfo.equals(other._j_typeinfo)
+    def get_java_type_info(self) -> JavaObject:
+        if not self._j_typeinfo:
+            j_types_array = get_gateway()\
+                .new_array(get_gateway().jvm.org.apache.flink.api.common.typeinfo.TypeInformation,
+                           len(self.types))
+            for i in range(len(self.types)):
+                wrapper_typeinfo = self.types[i]

Review comment:
       `wrapper_typeinfo` -> `field_typeinfo`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org