You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/08/10 11:02:17 UTC

[flink] branch master updated: [hotfix][python] Move json/avro/csv SerializationSchema implementations into the corresponding files

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 07858933aac [hotfix][python] Move json/avro/csv SerializationSchema implementations into the corresponding files
07858933aac is described below

commit 07858933aac71715f9f39901cab7716d298dd28f
Author: Dian Fu <di...@apache.org>
AuthorDate: Wed Aug 10 11:29:52 2022 +0800

    [hotfix][python] Move json/avro/csv SerializationSchema implementations into the corresponding files
---
 .../docs/connectors/datastream/filesystem.md       |   2 +-
 .../docs/connectors/datastream/formats/csv.md      |   4 +-
 .../python/datastream/intro_to_datastream_api.md   |   4 +-
 .../docs/connectors/datastream/filesystem.md       |   2 +-
 .../docs/connectors/datastream/formats/csv.md      |   4 +-
 .../python/datastream/intro_to_datastream_api.md   |   4 +-
 .../python/datastream/data_stream_job.py           |   3 +-
 flink-python/pyflink/common/__init__.py            |  50 ++-
 flink-python/pyflink/common/io.py                  |  32 ++
 flink-python/pyflink/common/serialization.py       | 345 ++-------------------
 .../common/tests/test_serialization_schemas.py     |  86 +----
 flink-python/pyflink/common/utils.py               |  25 ++
 flink-python/pyflink/datastream/__init__.py        |  17 +-
 .../pyflink/datastream/connectors/file_system.py   |  42 +--
 .../datastream/connectors/tests/test_kafka.py      |   7 +-
 .../datastream/connectors/tests/test_rabbitmq.py   |   3 +-
 flink-python/pyflink/datastream/formats/avro.py    |  99 +++++-
 flink-python/pyflink/datastream/formats/csv.py     | 170 ++++++++--
 flink-python/pyflink/datastream/formats/json.py    | 150 +++++++++
 flink-python/pyflink/datastream/formats/orc.py     |  24 +-
 flink-python/pyflink/datastream/formats/parquet.py |  37 ++-
 .../pyflink/datastream/formats/tests/test_avro.py  |   6 +-
 .../pyflink/datastream/formats/tests/test_csv.py   |  56 +++-
 .../pyflink/datastream/formats/tests/test_json.py  |  57 ++++
 .../datastream/formats/tests/test_parquet.py       |   4 +-
 .../datastream/stream_execution_environment.py     |   7 +-
 .../tests/test_stream_execution_environment.py     |   2 +-
 flink-python/pyflink/datastream/utils.py           |   9 -
 .../datastream/connectors/kafka_avro_format.py     |   3 +-
 .../datastream/connectors/kafka_csv_format.py      |   4 +-
 .../datastream/connectors/kafka_json_format.py     |   3 +-
 31 files changed, 709 insertions(+), 552 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/filesystem.md b/docs/content.zh/docs/connectors/datastream/filesystem.md
index d48e84f52d8..20d7a0bee38 100644
--- a/docs/content.zh/docs/connectors/datastream/filesystem.md
+++ b/docs/content.zh/docs/connectors/datastream/filesystem.md
@@ -570,7 +570,7 @@ data_stream = ...
 
 avro_type_info = GenericRecordAvroTypeInfo(schema)
 sink = FileSink \
-    .for_bulk_format(OUTPUT_BASE_PATH, AvroWriters.for_generic_record(schema)) \
+    .for_bulk_format(OUTPUT_BASE_PATH, AvroBulkWriters.for_generic_record(schema)) \
     .build()
 
 # 必须通过 map 操作来指定其 Avro 类型信息,用于数据的序列化
diff --git a/docs/content.zh/docs/connectors/datastream/formats/csv.md b/docs/content.zh/docs/connectors/datastream/formats/csv.md
index 568757f7137..2d3ef10455f 100644
--- a/docs/content.zh/docs/connectors/datastream/formats/csv.md
+++ b/docs/content.zh/docs/connectors/datastream/formats/csv.md
@@ -138,7 +138,7 @@ The corresponding CSV file:
 
 Similarly to the `TextLineInputFormat`, `CsvReaderFormat` can be used in both continues and batch modes (see [TextLineInputFormat]({{< ref "docs/connectors/datastream/formats/text_files" >}})  for examples).
 
-For PyFlink users, `CsvBulkWriter` could be used to create `BulkWriterFactory` to write `Row` records to files in CSV format.
+For PyFlink users, `CsvBulkWriters` could be used to create `BulkWriterFactory` to write `Row` records to files in CSV format.
 It should be noted that if the preceding operator of sink is an operator which produces `RowData` records, e.g. CSV source, it needs to be converted to `Row` records before writing to sink.
 ```python
 schema = CsvSchema.builder()
@@ -148,7 +148,7 @@ schema = CsvSchema.builder()
     .build()
 
 sink = FileSink.for_bulk_format(
-    OUTPUT_DIR, CsvBulkWriter.for_schema(schema)).build()
+    OUTPUT_DIR, CsvBulkWriters.for_schema(schema)).build()
 
 # If ds is a source stream producing RowData records, a map could be added to help converting RowData records into Row records.
 ds.map(lambda e: e, output_type=schema.get_type_info()).sink_to(sink)
diff --git a/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md b/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md
index 5eefc0376b2..d87e1f4ee2e 100644
--- a/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md
+++ b/docs/content.zh/docs/dev/python/datastream/intro_to_datastream_api.md
@@ -154,10 +154,10 @@ will be `Types.PICKLED_BYTE_ARRAY()`.
 You can also create a `DataStream` using DataStream connectors with method `add_source` as following:
 
 ```python
-from pyflink.common.serialization import JsonRowDeserializationSchema
 from pyflink.common.typeinfo import Types
 from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
+from pyflink.datastream.format.json import JsonRowDeserializationSchema
 
 env = StreamExecutionEnvironment.get_execution_environment()
 # the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues
@@ -305,7 +305,7 @@ You can call the `add_sink` method to emit the data of a `DataStream` to a DataS
 ```python
 from pyflink.common.typeinfo import Types
 from pyflink.datastream.connectors.kafka import FlinkKafkaProducer
-from pyflink.common.serialization import JsonRowSerializationSchema
+from pyflink.datastream.formats.json import JsonRowSerializationSchema
 
 serialization_schema = JsonRowSerializationSchema.builder().with_type_info(
     type_info=Types.ROW([Types.INT(), Types.STRING()])).build()
diff --git a/docs/content/docs/connectors/datastream/filesystem.md b/docs/content/docs/connectors/datastream/filesystem.md
index 41eb054015e..e5e39646a43 100644
--- a/docs/content/docs/connectors/datastream/filesystem.md
+++ b/docs/content/docs/connectors/datastream/filesystem.md
@@ -569,7 +569,7 @@ data_stream = ...
 
 avro_type_info = GenericRecordAvroTypeInfo(schema)
 sink = FileSink \
-    .for_bulk_format(OUTPUT_BASE_PATH, AvroWriters.for_generic_record(schema)) \
+    .for_bulk_format(OUTPUT_BASE_PATH, AvroBulkWriters.for_generic_record(schema)) \
     .build()
 
 # A map to indicate its Avro type info is necessary for serialization
diff --git a/docs/content/docs/connectors/datastream/formats/csv.md b/docs/content/docs/connectors/datastream/formats/csv.md
index 3b9a374635d..60cacc9d5c8 100644
--- a/docs/content/docs/connectors/datastream/formats/csv.md
+++ b/docs/content/docs/connectors/datastream/formats/csv.md
@@ -138,7 +138,7 @@ The corresponding CSV file:
 
 Similarly to the `TextLineInputFormat`, `CsvReaderFormat` can be used in both continues and batch modes (see [TextLineInputFormat]({{< ref "docs/connectors/datastream/formats/text_files" >}})  for examples).
 
-For PyFlink users, `CsvBulkWriter` could be used to create `BulkWriterFactory` to write `Row` records to files in CSV format.
+For PyFlink users, `CsvBulkWriters` could be used to create `BulkWriterFactory` to write `Row` records to files in CSV format.
 It should be noted that if the preceding operator of sink is an operator which produces `RowData` records, e.g. CSV source, it needs to be converted to `Row` records before writing to sink.
 ```python
 schema = CsvSchema.builder() \
@@ -148,7 +148,7 @@ schema = CsvSchema.builder() \
     .build()
 
 sink = FileSink.for_bulk_format(
-    OUTPUT_DIR, CsvBulkWriter.for_schema(schema)).build()
+    OUTPUT_DIR, CsvBulkWriters.for_schema(schema)).build()
 
 # If ds is a source stream producing RowData records, a map could be added to help converting RowData records into Row records.
 ds.map(lambda e: e, output_type=schema.get_type_info()).sink_to(sink)
diff --git a/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md b/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md
index daedb0f418b..61b0d56b451 100644
--- a/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md
+++ b/docs/content/docs/dev/python/datastream/intro_to_datastream_api.md
@@ -154,10 +154,10 @@ will be `Types.PICKLED_BYTE_ARRAY()`.
 You can also create a `DataStream` using DataStream connectors with method `add_source` as following:
 
 ```python
-from pyflink.common.serialization import JsonRowDeserializationSchema
 from pyflink.common.typeinfo import Types
 from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
+from pyflink.datastream.formats.json import JsonRowDeserializationSchema
 
 env = StreamExecutionEnvironment.get_execution_environment()
 # the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues
@@ -305,7 +305,7 @@ You can call the `add_sink` method to emit the data of a `DataStream` to a DataS
 ```python
 from pyflink.common.typeinfo import Types
 from pyflink.datastream.connectors.kafka import FlinkKafkaProducer
-from pyflink.common.serialization import JsonRowSerializationSchema
+from pyflink.datastream.formats.json import JsonRowSerializationSchema
 
 serialization_schema = JsonRowSerializationSchema.builder().with_type_info(
     type_info=Types.ROW([Types.INT(), Types.STRING()])).build()
diff --git a/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py b/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
index 6bb27d0d9cd..880c71e3145 100644
--- a/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
+++ b/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
@@ -19,11 +19,12 @@
 from typing import Any
 
 from pyflink.common import Duration
-from pyflink.common.serialization import SimpleStringSchema, JsonRowDeserializationSchema
+from pyflink.common.serialization import SimpleStringSchema
 from pyflink.common.typeinfo import Types
 from pyflink.common.watermark_strategy import TimestampAssigner, WatermarkStrategy
 from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
 from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer
+from pyflink.datastream.formats.json import JsonRowDeserializationSchema
 from pyflink.datastream.functions import KeyedProcessFunction
 
 from functions import MyKeySelector
diff --git a/flink-python/pyflink/common/__init__.py b/flink-python/pyflink/common/__init__.py
index f120c406b43..19f76febc7d 100644
--- a/flink-python/pyflink/common/__init__.py
+++ b/flink-python/pyflink/common/__init__.py
@@ -39,10 +39,14 @@ Common classes used by both Flink DataStream API and Table API:
     - :class:`SerializationSchema`:
       Base class to describes how to turn a data object into a different serialized representation.
       Most data sinks (for example Apache Kafka) require the data to be handed to them in a specific
-      format (for example as byte strings). See :class:`JsonRowSerializationSchema`,
-      :class:`JsonRowDeserializationSchema`, :class:`CsvRowSerializationSchema`,
-      :class:`CsvRowDeserializationSchema`, :class:`AvroRowSerializationSchema`,
-      :class:`AvroRowDeserializationSchema` and :class:`SimpleStringSchema` for more details.
+      format (for example as byte strings). See
+      :class:`~pyflink.datastream.formats.json.JsonRowSerializationSchema`,
+      :class:`~pyflink.datastream.formats.json.JsonRowDeserializationSchema`,
+      :class:`~pyflink.datastream.formats.csv.CsvRowSerializationSchema`,
+      :class:`~pyflink.datastream.formats.csv.CsvRowDeserializationSchema`,
+      :class:`~pyflink.datastream.formats.avro.AvroRowSerializationSchema`,
+      :class:`~pyflink.datastream.formats.avro.AvroRowDeserializationSchema` and
+      :class:`~SimpleStringSchema` for more details.
 """
 from pyflink.common.completable_future import CompletableFuture
 from pyflink.common.config_options import ConfigOption, ConfigOptions
@@ -56,9 +60,7 @@ from pyflink.common.job_id import JobID
 from pyflink.common.job_status import JobStatus
 from pyflink.common.restart_strategy import RestartStrategies, RestartStrategyConfiguration
 from pyflink.common.serialization import SerializationSchema, DeserializationSchema, \
-    SimpleStringSchema, JsonRowSerializationSchema, JsonRowDeserializationSchema, \
-    CsvRowSerializationSchema, CsvRowDeserializationSchema, AvroRowSerializationSchema, \
-    AvroRowDeserializationSchema, Encoder
+    SimpleStringSchema, Encoder
 from pyflink.common.serializer import TypeSerializer
 from pyflink.common.typeinfo import Types, TypeInformation
 from pyflink.common.types import Row, RowKind
@@ -77,12 +79,6 @@ __all__ = [
     'SerializationSchema',
     'DeserializationSchema',
     'SimpleStringSchema',
-    'JsonRowSerializationSchema',
-    'JsonRowDeserializationSchema',
-    'CsvRowSerializationSchema',
-    'CsvRowDeserializationSchema',
-    'AvroRowSerializationSchema',
-    'AvroRowDeserializationSchema',
     'Encoder',
     'CompletableFuture',
     'ExecutionMode',
@@ -100,5 +96,31 @@ __all__ = [
     "Instant",
     "Time",
     "AssignerWithPeriodicWatermarksWrapper"
-    ""
 ]
+
+
+def _install():
+    from pyflink import common
+
+    # json
+    from pyflink.datastream.formats.json import JsonRowDeserializationSchema
+    from pyflink.datastream.formats.json import JsonRowSerializationSchema
+    setattr(common, 'JsonRowDeserializationSchema', JsonRowDeserializationSchema)
+    setattr(common, 'JsonRowSerializationSchema', JsonRowSerializationSchema)
+
+    # csv
+    from pyflink.datastream.formats.csv import CsvRowDeserializationSchema
+    from pyflink.datastream.formats.csv import CsvRowSerializationSchema
+    setattr(common, 'CsvRowDeserializationSchema', CsvRowDeserializationSchema)
+    setattr(common, 'CsvRowSerializationSchema', CsvRowSerializationSchema)
+
+    # avro
+    from pyflink.datastream.formats.avro import AvroRowDeserializationSchema
+    from pyflink.datastream.formats.avro import AvroRowSerializationSchema
+    setattr(common, 'AvroRowDeserializationSchema', AvroRowDeserializationSchema)
+    setattr(common, 'AvroRowSerializationSchema', AvroRowSerializationSchema)
+
+
+# for backward compatibility
+_install()
+del _install
diff --git a/flink-python/pyflink/common/io.py b/flink-python/pyflink/common/io.py
new file mode 100644
index 00000000000..97cf803685c
--- /dev/null
+++ b/flink-python/pyflink/common/io.py
@@ -0,0 +1,32 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.common.utils import JavaObjectWrapper
+
+__all__ = [
+    'InputFormat'
+]
+
+
+class InputFormat(JavaObjectWrapper):
+    """
+    The Python wrapper of Java InputFormat interface, which is the base interface for data sources
+    that produce records.
+    """
+
+    def __init__(self, j_input_format):
+        super().__init__(j_input_format)
diff --git a/flink-python/pyflink/common/serialization.py b/flink-python/pyflink/common/serialization.py
index e70bb8ffd3a..23613e25cc7 100644
--- a/flink-python/pyflink/common/serialization.py
+++ b/flink-python/pyflink/common/serialization.py
@@ -15,18 +15,17 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-from py4j.java_gateway import java_import
-from pyflink.common import typeinfo
-from pyflink.common.typeinfo import TypeInformation
-
-from pyflink.util.java_utils import load_java_class
 
+from pyflink.common.utils import JavaObjectWrapper
 from pyflink.java_gateway import get_gateway
 
-__all__ = ['SerializationSchema', 'DeserializationSchema', 'SimpleStringSchema',
-           'JsonRowSerializationSchema', 'JsonRowDeserializationSchema',
-           'CsvRowSerializationSchema', 'CsvRowDeserializationSchema',
-           'AvroRowSerializationSchema', 'AvroRowDeserializationSchema', 'Encoder']
+__all__ = [
+    'SerializationSchema',
+    'DeserializationSchema',
+    'SimpleStringSchema',
+    'Encoder',
+    'BulkWriterFactory'
+]
 
 
 class SerializationSchema(object):
@@ -69,310 +68,6 @@ class SimpleStringSchema(SerializationSchema, DeserializationSchema):
             self, j_deserialization_schema=j_simple_string_serialization_schema)
 
 
-class JsonRowDeserializationSchema(DeserializationSchema):
-    """
-    Deserialization schema from JSON to Flink types.
-
-    Deserializes a byte[] message as a JSON object and reads the specified fields.
-
-    Failures during deserialization are forwarded as wrapped IOExceptions.
-    """
-    def __init__(self, j_deserialization_schema):
-        super(JsonRowDeserializationSchema, self).__init__(j_deserialization_schema)
-
-    @staticmethod
-    def builder():
-        """
-        A static method to get a Builder for JsonRowDeserializationSchema.
-        """
-        return JsonRowDeserializationSchema.Builder()
-
-    class Builder(object):
-        """
-        Builder for JsonRowDeserializationSchema.
-        """
-
-        def __init__(self):
-            self._type_info = None
-            self._fail_on_missing_field = False
-            self._ignore_parse_errors = False
-
-        def type_info(self, type_info: TypeInformation):
-            """
-            Creates a JSON deserialization schema for the given type information.
-
-            :param type_info: Type information describing the result type. The field names of Row
-                              are used to parse the JSON properties.
-            """
-            self._type_info = type_info
-            return self
-
-        def json_schema(self, json_schema: str):
-            """
-            Creates a JSON deserialization schema for the given JSON schema.
-
-            :param json_schema: JSON schema describing the result type.
-            """
-            if json_schema is None:
-                raise TypeError("The json_schema must not be None.")
-            j_type_info = get_gateway().jvm \
-                .org.apache.flink.formats.json.JsonRowSchemaConverter.convert(json_schema)
-            self._type_info = typeinfo._from_java_type(j_type_info)
-            return self
-
-        def fail_on_missing_field(self):
-            """
-            Configures schema to fail if a JSON field is missing. A missing field is ignored and the
-            field is set to null by default.
-            """
-            self._fail_on_missing_field = True
-            return self
-
-        def ignore_parse_errors(self):
-            """
-            Configures schema to fail when parsing json failed. An exception will be thrown when
-            parsing json fails.
-            """
-            self._ignore_parse_errors = True
-            return self
-
-        def build(self):
-            JBuilder = get_gateway().jvm.org.apache.flink.formats.json.JsonRowDeserializationSchema\
-                .Builder
-            j_builder = JBuilder(self._type_info.get_java_type_info())
-
-            if self._fail_on_missing_field:
-                j_builder = j_builder.fialOnMissingField()
-
-            if self._ignore_parse_errors:
-                j_builder = j_builder.ignoreParseErrors()
-
-            j_deserialization_schema = j_builder.build()
-            return JsonRowDeserializationSchema(j_deserialization_schema=j_deserialization_schema)
-
-
-class JsonRowSerializationSchema(SerializationSchema):
-    """
-    Serialization schema that serializes an object of Flink types into a JSON bytes. Serializes the
-    input Flink object into a JSON string and converts it into byte[].
-
-    Result byte[] message can be deserialized using JsonRowDeserializationSchema.
-    """
-
-    def __init__(self, j_serialization_schema):
-        super(JsonRowSerializationSchema, self).__init__(j_serialization_schema)
-
-    @staticmethod
-    def builder():
-        return JsonRowSerializationSchema.Builder()
-
-    class Builder(object):
-        """
-        Builder for JsonRowSerializationSchema.
-        """
-        def __init__(self):
-            self._type_info = None
-
-        def with_type_info(self, type_info: TypeInformation):
-            """
-            Creates a JSON serialization schema for the given type information.
-
-            :param type_info: Type information describing the result type. The field names of Row
-                              are used to parse the JSON properties.
-            """
-            self._type_info = type_info
-            return self
-
-        def build(self):
-            if self._type_info is None:
-                raise TypeError("Typeinfo should be set.")
-
-            j_builder = get_gateway().jvm \
-                .org.apache.flink.formats.json.JsonRowSerializationSchema.builder()
-
-            j_schema = j_builder.withTypeInfo(self._type_info.get_java_type_info()).build()
-            return JsonRowSerializationSchema(j_serialization_schema=j_schema)
-
-
-class CsvRowDeserializationSchema(DeserializationSchema):
-    """
-    Deserialization schema from CSV to Flink types. Deserializes a byte[] message as a JsonNode and
-    converts it to Row.
-
-    Failure during deserialization are forwarded as wrapped IOException.
-    """
-
-    def __init__(self, j_deserialization_schema):
-        super(CsvRowDeserializationSchema, self).__init__(
-            j_deserialization_schema=j_deserialization_schema)
-
-    class Builder(object):
-        """
-        A builder for creating a CsvRowDeserializationSchema.
-        """
-        def __init__(self, type_info: TypeInformation):
-            if type_info is None:
-                raise TypeError("Type information must not be None")
-            self._j_builder = get_gateway().jvm\
-                .org.apache.flink.formats.csv.CsvRowDeserializationSchema.Builder(
-                type_info.get_java_type_info())
-
-        def set_field_delimiter(self, delimiter: str):
-            self._j_builder = self._j_builder.setFieldDelimiter(delimiter)
-            return self
-
-        def set_allow_comments(self, allow_comments: bool):
-            self._j_builder = self._j_builder.setAllowComments(allow_comments)
-            return self
-
-        def set_array_element_delimiter(self, delimiter: str):
-            self._j_builder = self._j_builder.setArrayElementDelimiter(delimiter)
-            return self
-
-        def set_quote_character(self, c: str):
-            self._j_builder = self._j_builder.setQuoteCharacter(c)
-            return self
-
-        def set_escape_character(self, c: str):
-            self._j_builder = self._j_builder.setEscapeCharacter(c)
-            return self
-
-        def set_null_literal(self, null_literal: str):
-            self._j_builder = self._j_builder.setNullLiteral(null_literal)
-            return self
-
-        def set_ignore_parse_errors(self, ignore_parse_errors: bool):
-            self._j_builder = self._j_builder.setIgnoreParseErrors(ignore_parse_errors)
-            return self
-
-        def build(self):
-            j_csv_row_deserialization_schema = self._j_builder.build()
-            return CsvRowDeserializationSchema(
-                j_deserialization_schema=j_csv_row_deserialization_schema)
-
-
-class CsvRowSerializationSchema(SerializationSchema):
-    """
-    Serialization schema that serializes an object of Flink types into a CSV bytes. Serializes the
-    input row into an ObjectNode and converts it into byte[].
-
-    Result byte[] messages can be deserialized using CsvRowDeserializationSchema.
-    """
-    def __init__(self, j_csv_row_serialization_schema):
-        super(CsvRowSerializationSchema, self).__init__(j_csv_row_serialization_schema)
-
-    class Builder(object):
-        """
-        A builder for creating a CsvRowSerializationSchema.
-        """
-        def __init__(self, type_info: TypeInformation):
-            if type_info is None:
-                raise TypeError("Type information must not be None")
-            self._j_builder = get_gateway().jvm\
-                .org.apache.flink.formats.csv.CsvRowSerializationSchema.Builder(
-                type_info.get_java_type_info())
-
-        def set_field_delimiter(self, c: str):
-            self._j_builder = self._j_builder.setFieldDelimiter(c)
-            return self
-
-        def set_line_delimiter(self, delimiter: str):
-            self._j_builder = self._j_builder.setLineDelimiter(delimiter)
-            return self
-
-        def set_array_element_delimiter(self, delimiter: str):
-            self._j_builder = self._j_builder.setArrayElementDelimiter(delimiter)
-            return self
-
-        def disable_quote_character(self):
-            self._j_builder = self._j_builder.disableQuoteCharacter()
-            return self
-
-        def set_quote_character(self, c: str):
-            self._j_builder = self._j_builder.setQuoteCharacter(c)
-            return self
-
-        def set_escape_character(self, c: str):
-            self._j_builder = self._j_builder.setEscapeCharacter(c)
-            return self
-
-        def set_null_literal(self, s: str):
-            self._j_builder = self._j_builder.setNullLiteral(s)
-            return self
-
-        def build(self):
-            j_serialization_schema = self._j_builder.build()
-            return CsvRowSerializationSchema(j_csv_row_serialization_schema=j_serialization_schema)
-
-
-class AvroRowDeserializationSchema(DeserializationSchema):
-    """
-    Deserialization schema from Avro bytes to Row. Deserializes the byte[] messages into (nested)
-    Flink rows. It converts Avro types into types that are compatible with Flink's Table & SQL API.
-
-    Projects with Avro records containing logical date/time types need to add a JodaTime dependency.
-    """
-    def __init__(self, record_class: str = None, avro_schema_string: str = None):
-        """
-        Creates an Avro deserialization schema for the given specific record class or Avro schema
-        string. Having the concrete Avro record class might improve performance.
-
-        :param record_class: Avro record class used to deserialize Avro's record to Flink's row.
-        :param avro_schema_string: Avro schema string to deserialize Avro's record to Flink's row.
-        """
-
-        if avro_schema_string is None and record_class is None:
-            raise TypeError("record_class or avro_schema_string should be specified.")
-        j_deserialization_schema = None
-        if record_class is not None:
-            gateway = get_gateway()
-            java_import(gateway.jvm, record_class)
-            j_record_class = load_java_class(record_class)
-            JAvroRowDeserializationSchema = get_gateway().jvm \
-                .org.apache.flink.formats.avro.AvroRowDeserializationSchema
-            j_deserialization_schema = JAvroRowDeserializationSchema(j_record_class)
-
-        elif avro_schema_string is not None:
-            JAvroRowDeserializationSchema = get_gateway().jvm \
-                .org.apache.flink.formats.avro.AvroRowDeserializationSchema
-            j_deserialization_schema = JAvroRowDeserializationSchema(avro_schema_string)
-
-        super(AvroRowDeserializationSchema, self).__init__(j_deserialization_schema)
-
-
-class AvroRowSerializationSchema(SerializationSchema):
-    """
-    Serialization schema that serializes to Avro binary format.
-    """
-
-    def __init__(self, record_class: str = None, avro_schema_string: str = None):
-        """
-        Creates AvroSerializationSchema that serializes SpecificRecord using provided schema or
-        record class.
-
-        :param record_class: Avro record class used to serialize  Flink's row to Avro's record.
-        :param avro_schema_string: Avro schema string to serialize Flink's row to Avro's record.
-        """
-        if avro_schema_string is None and record_class is None:
-            raise TypeError("record_class or avro_schema_string should be specified.")
-
-        j_serialization_schema = None
-        if record_class is not None:
-            gateway = get_gateway()
-            java_import(gateway.jvm, record_class)
-            j_record_class = load_java_class(record_class)
-            JAvroRowSerializationSchema = get_gateway().jvm \
-                .org.apache.flink.formats.avro.AvroRowSerializationSchema
-            j_serialization_schema = JAvroRowSerializationSchema(j_record_class)
-
-        elif avro_schema_string is not None:
-            JAvroRowSerializationSchema = get_gateway().jvm \
-                .org.apache.flink.formats.avro.AvroRowSerializationSchema
-            j_serialization_schema = JAvroRowSerializationSchema(avro_schema_string)
-
-        super(AvroRowSerializationSchema, self).__init__(j_serialization_schema)
-
-
 class Encoder(object):
     """
     Encoder is used by the file sink to perform the actual writing of the
@@ -391,3 +86,27 @@ class Encoder(object):
         j_encoder = get_gateway().jvm.org.apache.flink.api.common.serialization.\
             SimpleStringEncoder(charset_name)
         return Encoder(j_encoder)
+
+
+class BulkWriterFactory(JavaObjectWrapper):
+    """
+    The Python wrapper of Java BulkWriter.Factory interface, which is the base interface for data
+    sinks that write records into files in a bulk manner.
+    """
+
+    def __init__(self, j_bulk_writer_factory):
+        super().__init__(j_bulk_writer_factory)
+
+
+class RowDataBulkWriterFactory(BulkWriterFactory):
+    """
+    A :class:`~BulkWriterFactory` that receives records with RowData type. This is for indicating
+    that Row record from Python must be first converted to RowData.
+    """
+
+    def __init__(self, j_bulk_writer_factory, row_type):
+        super().__init__(j_bulk_writer_factory)
+        self._row_type = row_type
+
+    def get_row_type(self):
+        return self._row_type
diff --git a/flink-python/pyflink/common/tests/test_serialization_schemas.py b/flink-python/pyflink/common/tests/test_serialization_schemas.py
index 9544bd13dba..702241a6039 100644
--- a/flink-python/pyflink/common/tests/test_serialization_schemas.py
+++ b/flink-python/pyflink/common/tests/test_serialization_schemas.py
@@ -15,15 +15,11 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-from pyflink.common.serialization import JsonRowSerializationSchema, \
-    JsonRowDeserializationSchema, CsvRowSerializationSchema, CsvRowDeserializationSchema, \
-    SimpleStringSchema
-from pyflink.common.typeinfo import Types
-from pyflink.java_gateway import get_gateway
+from pyflink.common.serialization import SimpleStringSchema
 from pyflink.testing.test_case_utils import PyFlinkTestCase
 
 
-class TestRowSerializationSchemas(PyFlinkTestCase):
+class SimpleStringSchemaTests(PyFlinkTestCase):
 
     def test_simple_string_schema(self):
         expected_string = 'test string'
@@ -33,81 +29,3 @@ class TestRowSerializationSchemas(PyFlinkTestCase):
 
         self.assertEqual(expected_string, simple_string_schema._j_deserialization_schema
                          .deserialize(expected_string.encode(encoding='utf-8')))
-
-    def test_json_row_serialization_deserialization_schema(self):
-        jvm = get_gateway().jvm
-        jsons = ["{\"svt\":\"2020-02-24T12:58:09.209+0800\"}",
-                 "{\"svt\":\"2020-02-24T12:58:09.209+0800\", "
-                 "\"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\"},\"ids\":[1, 2, 3]}",
-                 "{\"svt\":\"2020-02-24T12:58:09.209+0800\"}"]
-        expected_jsons = ["{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"ops\":null,\"ids\":null}",
-                          "{\"svt\":\"2020-02-24T12:58:09.209+0800\","
-                          "\"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\"},"
-                          "\"ids\":[1,2,3]}",
-                          "{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"ops\":null,\"ids\":null}"]
-
-        row_schema = Types.ROW_NAMED(["svt", "ops", "ids"],
-                                     [Types.STRING(),
-                                     Types.ROW_NAMED(['id'], [Types.STRING()]),
-                                     Types.PRIMITIVE_ARRAY(Types.INT())])
-
-        json_row_serialization_schema = JsonRowSerializationSchema.builder() \
-            .with_type_info(row_schema).build()
-        json_row_deserialization_schema = JsonRowDeserializationSchema.builder() \
-            .type_info(row_schema).build()
-        json_row_serialization_schema._j_serialization_schema.open(
-            jvm.org.apache.flink.connector.testutils.formats.DummyInitializationContext())
-        json_row_deserialization_schema._j_deserialization_schema.open(
-            jvm.org.apache.flink.connector.testutils.formats.DummyInitializationContext())
-
-        for i in range(len(jsons)):
-            j_row = json_row_deserialization_schema._j_deserialization_schema\
-                .deserialize(bytes(jsons[i], encoding='utf-8'))
-            result = str(json_row_serialization_schema._j_serialization_schema.serialize(j_row),
-                         encoding='utf-8')
-            self.assertEqual(expected_jsons[i], result)
-
-    def test_csv_row_serialization_schema(self):
-        jvm = get_gateway().jvm
-        JRow = jvm.org.apache.flink.types.Row
-
-        j_row = JRow(3)
-        j_row.setField(0, "BEGIN")
-        j_row.setField(2, "END")
-
-        def field_assertion(field_info, csv_value, value, field_delimiter):
-            row_info = Types.ROW([Types.STRING(), field_info, Types.STRING()])
-            expected_csv = "BEGIN" + field_delimiter + csv_value + field_delimiter + "END\n"
-            j_row.setField(1, value)
-
-            csv_row_serialization_schema = CsvRowSerializationSchema.Builder(row_info)\
-                .set_escape_character('*').set_quote_character('\'')\
-                .set_array_element_delimiter(':').set_field_delimiter(';').build()
-            csv_row_deserialization_schema = CsvRowDeserializationSchema.Builder(row_info)\
-                .set_escape_character('*').set_quote_character('\'')\
-                .set_array_element_delimiter(':').set_field_delimiter(';').build()
-            csv_row_serialization_schema._j_serialization_schema.open(
-                jvm.org.apache.flink.connector.testutils.formats.DummyInitializationContext())
-            csv_row_deserialization_schema._j_deserialization_schema.open(
-                jvm.org.apache.flink.connector.testutils.formats.DummyInitializationContext())
-
-            serialized_bytes = csv_row_serialization_schema._j_serialization_schema.serialize(j_row)
-            self.assertEqual(expected_csv, str(serialized_bytes, encoding='utf-8'))
-
-            j_deserialized_row = csv_row_deserialization_schema._j_deserialization_schema\
-                .deserialize(expected_csv.encode("utf-8"))
-            self.assertTrue(j_row.equals(j_deserialized_row))
-
-        field_assertion(Types.STRING(), "'123''4**'", "123'4*", ";")
-        field_assertion(Types.STRING(), "'a;b''c'", "a;b'c", ";")
-        field_assertion(Types.INT(), "12", 12, ";")
-
-        test_j_row = JRow(2)
-        test_j_row.setField(0, "1")
-        test_j_row.setField(1, "hello")
-
-        field_assertion(Types.ROW([Types.STRING(), Types.STRING()]), "'1:hello'", test_j_row, ";")
-        test_j_row.setField(1, "hello world")
-        field_assertion(Types.ROW([Types.STRING(), Types.STRING()]), "'1:hello world'", test_j_row,
-                        ";")
-        field_assertion(Types.STRING(), "null", "null", ";")
diff --git a/flink-python/pyflink/common/utils.py b/flink-python/pyflink/common/utils.py
new file mode 100644
index 00000000000..05e047b64a8
--- /dev/null
+++ b/flink-python/pyflink/common/utils.py
@@ -0,0 +1,25 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+class JavaObjectWrapper(object):
+
+    def __init__(self, j_object):
+        self._j_object = j_object
+
+    def get_java_object(self):
+        return self._j_object
diff --git a/flink-python/pyflink/datastream/__init__.py b/flink-python/pyflink/datastream/__init__.py
index 436e8677666..b9ce763bd53 100644
--- a/flink-python/pyflink/datastream/__init__.py
+++ b/flink-python/pyflink/datastream/__init__.py
@@ -215,21 +215,22 @@ Classes to define formats used together with source & sink:
     - :class:`formats.csv.CsvReaderFormat`:
       A :class:`~connectors.file_system.StreamFormat` to read CSV files into Row data.
     - :class:`formats.csv.CsvBulkWriter`:
-      Creates :class:`~connectors.file_system.BulkWriterFactory` to write Row data into CSV files.
+      Creates :class:`~pyflink.common.serialization.BulkWriterFactory` to write Row data into CSV
+      files.
     - :class:`formats.avro.GenericRecordAvroTypeInfo`:
       A :class:`~pyflink.common.typeinfo.TypeInformation` to indicate vanilla Python records will be
       translated to GenericRecordAvroTypeInfo on the Java side.
     - :class:`formats.avro.AvroInputFormat`:
-      A :class:`~connector.file_system.InputFormat` to read avro files in a streaming fashion.
+      An InputFormat to read avro files in a streaming fashion.
     - :class:`formats.avro.AvroWriters`:
-      A class to provide :class:`~connector.file_system.BulkWriterFactory` to write vanilla Python
-      objects into avro files in a batch fashion.
+      A class to provide :class:`~pyflink.common.serialization.BulkWriterFactory` to write vanilla
+      Python objects into avro files in a batch fashion.
     - :class:`formats.parquet.ParquetColumnarRowInputFormat`:
       A :class:`~connectors.file_system.BulkFormat` to read columnar parquet files into Row data in
       a batch-processing fashion.
     - :class:`formats.parquet.ParquetBulkWriter`:
-      Convenient builder to create a :class:`~connectors.file_system.BulkWriterFactory` that writes
-      Rows with a defined RowType into Parquet files in a batch fashion.
+      Convenient builder to create a :class:`~pyflink.common.serialization.BulkWriterFactory` that
+      writes Rows with a defined RowType into Parquet files in a batch fashion.
     - :class:`formats.parquet.AvroParquetReaders`:
       A convenience builder to create reader format that reads individual Avro records from a
       Parquet stream. Only GenericRecord is supported in PyFlink.
@@ -237,8 +238,8 @@ Classes to define formats used together with source & sink:
       Convenience builder to create ParquetWriterFactory instances for Avro types. Only
       GenericRecord is supported in PyFlink.
     - :class:`formats.orc.OrcBulkWriters`:
-      Convenient builder to create a :class:`BulkWriterFactory` that writes Row records with a
-      defined :class:`RowType` into Orc files.
+      Convenient builder to create a :class:`~pyflink.common.serialization.BulkWriterFactory` that
+      writes Row records with a defined :class:`RowType` into Orc files.
 
 Other important classes:
 
diff --git a/flink-python/pyflink/datastream/connectors/file_system.py b/flink-python/pyflink/datastream/connectors/file_system.py
index 1a147495a1d..94fb040d3a8 100644
--- a/flink-python/pyflink/datastream/connectors/file_system.py
+++ b/flink-python/pyflink/datastream/connectors/file_system.py
@@ -20,6 +20,8 @@ from abc import abstractmethod
 
 from typing import TYPE_CHECKING, Optional
 
+from pyflink.common.serialization import BulkWriterFactory, RowDataBulkWriterFactory
+
 if TYPE_CHECKING:
     from pyflink.table.types import RowType
 
@@ -27,7 +29,7 @@ from pyflink.common import Duration, Encoder
 from pyflink.datastream.connectors import Source, Sink
 from pyflink.datastream.connectors.base import SupportsPreprocessing, StreamTransformer
 from pyflink.datastream.functions import SinkFunction
-from pyflink.datastream.utils import JavaObjectWrapper
+from pyflink.common.utils import JavaObjectWrapper
 from pyflink.java_gateway import get_gateway
 from pyflink.util.java_utils import to_jarray
 
@@ -39,10 +41,8 @@ __all__ = [
     'FileSourceBuilder',
     'FileSink',
     'StreamingFileSink',
-    'BulkFormat',
     'StreamFormat',
-    'InputFormat',
-    'BulkWriterFactory',
+    'BulkFormat',
     'FileEnumeratorProvider',
     'FileSplitAssignerProvider',
     'RollingPolicy',
@@ -165,40 +165,6 @@ class BulkFormat(object):
         self._j_bulk_format = j_bulk_format
 
 
-class InputFormat(JavaObjectWrapper):
-    """
-    The Python wrapper of Java InputFormat interface, which is the base interface for data sources
-    that produce records.
-    """
-
-    def __init__(self, j_input_format):
-        super().__init__(j_input_format)
-
-
-class BulkWriterFactory(JavaObjectWrapper):
-    """
-    The Python wrapper of Java BulkWriter.Factory interface, which is the base interface for data
-    sinks that write records into files in a bulk manner.
-    """
-
-    def __init__(self, j_bulk_writer_factory):
-        super().__init__(j_bulk_writer_factory)
-
-
-class RowDataBulkWriterFactory(BulkWriterFactory):
-    """
-    A :class:`BulkWriterFactory` that receives records with RowData type. This is for indicating
-    that Row record from Python must be first converted to RowData.
-    """
-
-    def __init__(self, j_bulk_writer_factory, row_type: 'RowType'):
-        super().__init__(j_bulk_writer_factory)
-        self._row_type = row_type
-
-    def get_row_type(self) -> 'RowType':
-        return self._row_type
-
-
 class FileSourceBuilder(object):
     """
     The builder for the :class:`~FileSource`, to configure the various behaviors.
diff --git a/flink-python/pyflink/datastream/connectors/tests/test_kafka.py b/flink-python/pyflink/datastream/connectors/tests/test_kafka.py
index b36e22092bf..cf9990af354 100644
--- a/flink-python/pyflink/datastream/connectors/tests/test_kafka.py
+++ b/flink-python/pyflink/datastream/connectors/tests/test_kafka.py
@@ -22,9 +22,7 @@ import pyflink.datastream.data_stream as data_stream
 from pyflink.common import typeinfo
 
 from pyflink.common.configuration import Configuration
-from pyflink.common.serialization import SimpleStringSchema, DeserializationSchema, \
-    JsonRowDeserializationSchema, CsvRowDeserializationSchema, AvroRowDeserializationSchema, \
-    JsonRowSerializationSchema, CsvRowSerializationSchema, AvroRowSerializationSchema
+from pyflink.common.serialization import SimpleStringSchema, DeserializationSchema
 from pyflink.common.typeinfo import Types
 from pyflink.common.types import Row
 from pyflink.common.watermark_strategy import WatermarkStrategy
@@ -32,6 +30,9 @@ from pyflink.datastream.connectors.base import DeliveryGuarantee
 from pyflink.datastream.connectors.kafka import KafkaSource, KafkaTopicPartition, \
     KafkaOffsetsInitializer, KafkaOffsetResetStrategy, KafkaRecordSerializationSchema, KafkaSink, \
     FlinkKafkaProducer, FlinkKafkaConsumer
+from pyflink.datastream.formats.avro import AvroRowDeserializationSchema, AvroRowSerializationSchema
+from pyflink.datastream.formats.csv import CsvRowDeserializationSchema, CsvRowSerializationSchema
+from pyflink.datastream.formats.json import JsonRowDeserializationSchema, JsonRowSerializationSchema
 from pyflink.java_gateway import get_gateway
 from pyflink.testing.test_case_utils import (
     PyFlinkStreamingTestCase,
diff --git a/flink-python/pyflink/datastream/connectors/tests/test_rabbitmq.py b/flink-python/pyflink/datastream/connectors/tests/test_rabbitmq.py
index de2fca63076..b49e10d7ddd 100644
--- a/flink-python/pyflink/datastream/connectors/tests/test_rabbitmq.py
+++ b/flink-python/pyflink/datastream/connectors/tests/test_rabbitmq.py
@@ -15,8 +15,9 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-from pyflink.common import Types, JsonRowDeserializationSchema, JsonRowSerializationSchema
+from pyflink.common import Types
 from pyflink.datastream.connectors.rabbitmq import RMQSink, RMQSource, RMQConnectionConfig
+from pyflink.datastream.formats.json import JsonRowDeserializationSchema, JsonRowSerializationSchema
 from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase
 from pyflink.util.java_utils import get_field_value
 
diff --git a/flink-python/pyflink/datastream/formats/avro.py b/flink-python/pyflink/datastream/formats/avro.py
index 66a16eb9f7d..c60902d8c5e 100644
--- a/flink-python/pyflink/datastream/formats/avro.py
+++ b/flink-python/pyflink/datastream/formats/avro.py
@@ -15,20 +15,24 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-from py4j.java_gateway import get_java_class, JavaObject
+from py4j.java_gateway import get_java_class, JavaObject, java_import
+
+from pyflink.common.io import InputFormat
+from pyflink.common.serialization import BulkWriterFactory, SerializationSchema, \
+    DeserializationSchema
 from pyflink.common.typeinfo import TypeInformation
 
-from pyflink.datastream.connectors.file_system import InputFormat, BulkWriterFactory
 from pyflink.datastream.utils import ResultTypeQueryable
 from pyflink.java_gateway import get_gateway
-from pyflink.util.java_utils import get_field_value
-
+from pyflink.util.java_utils import get_field_value, load_java_class
 
 __all__ = [
-    'AvroInputFormat',
     'AvroSchema',
-    'AvroWriters',
-    'GenericRecordAvroTypeInfo'
+    'GenericRecordAvroTypeInfo',
+    'AvroInputFormat',
+    'AvroBulkWriters',
+    'AvroRowDeserializationSchema',
+    'AvroRowSerializationSchema'
 ]
 
 
@@ -85,7 +89,7 @@ class GenericRecordAvroTypeInfo(TypeInformation):
     """
 
     def __init__(self, schema: 'AvroSchema'):
-        super().__init__()
+        super(GenericRecordAvroTypeInfo, self).__init__()
         self._schema = schema
         self._j_typeinfo = get_gateway().jvm.org.apache.flink.formats.avro.typeutils \
             .GenericRecordAvroTypeInfo(schema._j_schema)
@@ -118,16 +122,17 @@ class AvroInputFormat(InputFormat, ResultTypeQueryable):
             jvm.org.apache.flink.core.fs.Path(path),
             get_java_class(jvm.org.apache.flink.avro.shaded.org.apache.avro.generic.GenericRecord)
         )
-        super().__init__(j_avro_input_format)
+        super(AvroInputFormat, self).__init__(j_avro_input_format)
         self._type_info = GenericRecordAvroTypeInfo(schema)
 
     def get_produced_type(self) -> GenericRecordAvroTypeInfo:
         return self._type_info
 
 
-class AvroWriters(object):
+class AvroBulkWriters(object):
     """
-    Convenience builder to create AvroWriterFactory instances.
+    Convenience builder to create :class:`~pyflink.common.serialization.BulkWriterFactory` for
+    Avro types.
 
     .. versionadded:: 1.16.0
     """
@@ -139,7 +144,7 @@ class AvroWriters(object):
         will use the given schema to build and write the records.
 
         Note that to make this works in PyFlink, you need to declare the output type of the
-        predecessor before FileSink to be :class:`GenericRecordAvroTypeInfo`, and the predecessor
+        predecessor before FileSink to be :class:`~GenericRecordAvroTypeInfo`, and the predecessor
         cannot be :meth:`StreamExecutionEnvironment.from_collection`, you can add a pass-through map
         function before the sink, as the example shown below.
 
@@ -155,7 +160,7 @@ class AvroWriters(object):
             >>> avro_type_info = GenericRecordAvroTypeInfo(schema)
             >>> ds = env.from_collection([{'array': [1, 2]}], type_info=Types.PICKLED_BYTE_ARRAY())
             >>> sink = FileSink.for_bulk_format(
-            ...     OUTPUT_DIR, AvroWriters.for_generic_record(schema)).build()
+            ...     OUTPUT_DIR, AvroBulkWriters.for_generic_record(schema)).build()
             >>> # A map to indicate its Avro type info is necessary for serialization
             >>> ds.map(lambda e: e, output_type=GenericRecordAvroTypeInfo(schema)) \\
             ...     .sink_to(sink)
@@ -168,3 +173,71 @@ class AvroWriters(object):
             schema._j_schema
         )
         return BulkWriterFactory(j_bulk_writer_factory)
+
+
+class AvroRowDeserializationSchema(DeserializationSchema):
+    """
+    Deserialization schema from Avro bytes to Row. Deserializes the byte[] messages into (nested)
+    Flink rows. It converts Avro types into types that are compatible with Flink's Table & SQL API.
+
+    Projects with Avro records containing logical date/time types need to add a JodaTime dependency.
+    """
+    def __init__(self, record_class: str = None, avro_schema_string: str = None):
+        """
+        Creates an Avro deserialization schema for the given specific record class or Avro schema
+        string. Having the concrete Avro record class might improve performance.
+
+        :param record_class: Avro record class used to deserialize Avro's record to Flink's row.
+        :param avro_schema_string: Avro schema string to deserialize Avro's record to Flink's row.
+        """
+
+        if avro_schema_string is None and record_class is None:
+            raise TypeError("record_class or avro_schema_string should be specified.")
+        j_deserialization_schema = None
+        if record_class is not None:
+            gateway = get_gateway()
+            java_import(gateway.jvm, record_class)
+            j_record_class = load_java_class(record_class)
+            JAvroRowDeserializationSchema = get_gateway().jvm \
+                .org.apache.flink.formats.avro.AvroRowDeserializationSchema
+            j_deserialization_schema = JAvroRowDeserializationSchema(j_record_class)
+
+        elif avro_schema_string is not None:
+            JAvroRowDeserializationSchema = get_gateway().jvm \
+                .org.apache.flink.formats.avro.AvroRowDeserializationSchema
+            j_deserialization_schema = JAvroRowDeserializationSchema(avro_schema_string)
+
+        super(AvroRowDeserializationSchema, self).__init__(j_deserialization_schema)
+
+
+class AvroRowSerializationSchema(SerializationSchema):
+    """
+    Serialization schema that serializes to Avro binary format.
+    """
+
+    def __init__(self, record_class: str = None, avro_schema_string: str = None):
+        """
+        Creates AvroSerializationSchema that serializes SpecificRecord using provided schema or
+        record class.
+
+        :param record_class: Avro record class used to serialize  Flink's row to Avro's record.
+        :param avro_schema_string: Avro schema string to serialize Flink's row to Avro's record.
+        """
+        if avro_schema_string is None and record_class is None:
+            raise TypeError("record_class or avro_schema_string should be specified.")
+
+        j_serialization_schema = None
+        if record_class is not None:
+            gateway = get_gateway()
+            java_import(gateway.jvm, record_class)
+            j_record_class = load_java_class(record_class)
+            JAvroRowSerializationSchema = get_gateway().jvm \
+                .org.apache.flink.formats.avro.AvroRowSerializationSchema
+            j_serialization_schema = JAvroRowSerializationSchema(j_record_class)
+
+        elif avro_schema_string is not None:
+            JAvroRowSerializationSchema = get_gateway().jvm \
+                .org.apache.flink.formats.avro.AvroRowSerializationSchema
+            j_serialization_schema = JAvroRowSerializationSchema(avro_schema_string)
+
+        super(AvroRowSerializationSchema, self).__init__(j_serialization_schema)
diff --git a/flink-python/pyflink/datastream/formats/csv.py b/flink-python/pyflink/datastream/formats/csv.py
index a0d0f803440..377319493ac 100644
--- a/flink-python/pyflink/datastream/formats/csv.py
+++ b/flink-python/pyflink/datastream/formats/csv.py
@@ -15,19 +15,24 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-from typing import Optional
+from typing import Optional, TYPE_CHECKING
 
-from pyflink.common.typeinfo import _from_java_type
-from pyflink.datastream.connectors.file_system import BulkWriterFactory, RowDataBulkWriterFactory, \
-    StreamFormat
+from pyflink.common.serialization import BulkWriterFactory, RowDataBulkWriterFactory, \
+    SerializationSchema, DeserializationSchema
+from pyflink.common.typeinfo import _from_java_type, TypeInformation
+from pyflink.datastream.connectors.file_system import StreamFormat
 from pyflink.java_gateway import get_gateway
-from pyflink.table.types import DataType, DataTypes, _to_java_data_type, RowType, NumericType
+
+if TYPE_CHECKING:
+    from pyflink.table.types import DataType, RowType, NumericType
 
 __all__ = [
-    'CsvBulkWriter',
-    'CsvReaderFormat',
     'CsvSchema',
-    'CsvSchemaBuilder'
+    'CsvSchemaBuilder',
+    'CsvReaderFormat',
+    'CsvBulkWriters',
+    'CsvRowDeserializationSchema',
+    'CsvRowSerializationSchema'
 ]
 
 
@@ -39,9 +44,9 @@ class CsvSchema(object):
     .. versionadded:: 1.16.0
     """
 
-    def __init__(self, j_schema, row_type: RowType):
+    def __init__(self, j_schema, row_type: 'RowType'):
         self._j_schema = j_schema
-        self._row_type = row_type  # type: RowType
+        self._row_type = row_type
         self._type_info = None
 
     @staticmethod
@@ -53,6 +58,7 @@ class CsvSchema(object):
 
     def get_type_info(self):
         if self._type_info is None:
+            from pyflink.table.types import _to_java_data_type
             jvm = get_gateway().jvm
             j_type_info = jvm.org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter \
                 .toLegacyTypeInfo(_to_java_data_type(self._row_type))
@@ -74,7 +80,7 @@ class CsvSchema(object):
 
 class CsvSchemaBuilder(object):
     """
-    CsvSchemaBuilder is for building a :class:`CsvSchemaBuilder`, corresponding to Java
+    CsvSchemaBuilder is for building a :class:`~CsvSchema`, corresponding to Java
     ``com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder`` class.
 
     .. versionadded:: 1.16.0
@@ -88,14 +94,15 @@ class CsvSchemaBuilder(object):
 
     def build(self) -> 'CsvSchema':
         """
-        Build the :class:`CsvSchema`.
+        Build the :class:`~CsvSchema`.
         """
+        from pyflink.table.types import DataTypes
         return CsvSchema(self._j_schema_builder.build(), DataTypes.ROW(self._fields))
 
     def add_array_column(self,
                          name: str,
                          separator: str = ';',
-                         element_type: Optional[DataType] = DataTypes.STRING()) \
+                         element_type: Optional['DataType'] = None) \
             -> 'CsvSchemaBuilder':
         """
         Add an array column to schema, the type of elements could be specified via ``element_type``,
@@ -105,6 +112,9 @@ class CsvSchemaBuilder(object):
         :param separator: Text separator of array elements, default to ``;``.
         :param element_type: DataType of array elements, default to ``DataTypes.STRING()``.
         """
+        from pyflink.table.types import DataTypes
+        if element_type is None:
+            element_type = DataTypes.STRING()
         self._j_schema_builder.addArrayColumn(name, separator)
         self._fields.append(DataTypes.FIELD(name, DataTypes.ARRAY(element_type)))
         return self
@@ -115,12 +125,13 @@ class CsvSchemaBuilder(object):
 
         :param name: Name of the column.
         """
+        from pyflink.table.types import DataTypes
         self._j_schema_builder.addBooleanColumn(name)
         self._fields.append(DataTypes.FIELD(name, DataTypes.BOOLEAN()))
         return self
 
     def add_number_column(self, name: str,
-                          number_type: Optional[NumericType] = DataTypes.BIGINT()) \
+                          number_type: Optional['NumericType'] = None) \
             -> 'CsvSchemaBuilder':
         """
         Add a number column to schema, the type of number could be specified via ``number_type``.
@@ -128,6 +139,9 @@ class CsvSchemaBuilder(object):
         :param name: Name of the column.
         :param number_type: DataType of the number, default to ``DataTypes.BIGINT()``.
         """
+        from pyflink.table.types import DataTypes
+        if number_type is None:
+            number_type = DataTypes.BIGINT()
         self._j_schema_builder.addNumberColumn(name)
         self._fields.append(DataTypes.FIELD(name, number_type))
         return self
@@ -138,6 +152,7 @@ class CsvSchemaBuilder(object):
 
         :param name: Name of the column.
         """
+        from pyflink.table.types import DataTypes
         self._j_schema_builder.addColumn(name)
         self._fields.append(DataTypes.FIELD(name, DataTypes.STRING()))
         return self
@@ -279,7 +294,7 @@ class CsvSchemaBuilder(object):
 
 class CsvReaderFormat(StreamFormat):
     """
-    The :class:`StreamFormat` for reading csv files.
+    The :class:`~StreamFormat` for reading csv files.
 
     Example:
     ::
@@ -310,6 +325,7 @@ class CsvReaderFormat(StreamFormat):
         """
         Builds a :class:`CsvReaderFormat` using `CsvSchema`.
         """
+        from pyflink.table.types import _to_java_data_type
         jvm = get_gateway().jvm
         j_csv_format = jvm.org.apache.flink.formats.csv.PythonCsvUtils \
             .createCsvReaderFormat(
@@ -319,10 +335,10 @@ class CsvReaderFormat(StreamFormat):
         return CsvReaderFormat(j_csv_format)
 
 
-class CsvBulkWriter(object):
+class CsvBulkWriters(object):
     """
-    CsvBulkWriter is for building :class:`BulkWriterFactory` to write Rows with a predefined CSV
-    schema to partitioned files in a bulk fashion.
+    CsvBulkWriter is for building :class:`~pyflink.common.serialization.BulkWriterFactory` to write
+    Rows with a predefined CSV schema to partitioned files in a bulk fashion.
 
     Example:
     ::
@@ -334,7 +350,7 @@ class CsvBulkWriter(object):
         ...     .set_column_separator('|') \\
         ...     .build()
         >>> sink = FileSink.for_bulk_format(
-        ...     OUTPUT_DIR, CsvBulkWriter.for_schema(schema)).build()
+        ...     OUTPUT_DIR, CsvBulkWriters.for_schema(schema)).build()
         >>> # If ds is a source stream, an identity map before sink is required
         >>> ds.map(lambda e: e, output_type=schema.get_type_info()).sink_to(sink)
 
@@ -344,8 +360,11 @@ class CsvBulkWriter(object):
     @staticmethod
     def for_schema(schema: 'CsvSchema') -> 'BulkWriterFactory':
         """
-        Builds a :class:`BulkWriterFactory` for writing records to files in CSV format.
+        Creates a :class:`~pyflink.common.serialization.BulkWriterFactory` for writing records to
+        files in CSV format.
         """
+        from pyflink.table.types import _to_java_data_type
+
         jvm = get_gateway().jvm
         csv = jvm.org.apache.flink.formats.csv
 
@@ -353,3 +372,114 @@ class CsvBulkWriter(object):
             schema._j_schema,
             _to_java_data_type(schema._row_type))
         return RowDataBulkWriterFactory(j_factory, schema._row_type)
+
+
+class CsvRowDeserializationSchema(DeserializationSchema):
+    """
+    Deserialization schema from CSV to Flink types. Deserializes a byte[] message as a JsonNode and
+    converts it to Row.
+
+    Failure during deserialization are forwarded as wrapped IOException.
+    """
+
+    def __init__(self, j_deserialization_schema):
+        super(CsvRowDeserializationSchema, self).__init__(
+            j_deserialization_schema=j_deserialization_schema)
+
+    class Builder(object):
+        """
+        A builder for creating a CsvRowDeserializationSchema.
+        """
+        def __init__(self, type_info: TypeInformation):
+            if type_info is None:
+                raise TypeError("Type information must not be None")
+            self._j_builder = get_gateway().jvm\
+                .org.apache.flink.formats.csv.CsvRowDeserializationSchema.Builder(
+                type_info.get_java_type_info())
+
+        def set_field_delimiter(self, delimiter: str):
+            self._j_builder = self._j_builder.setFieldDelimiter(delimiter)
+            return self
+
+        def set_allow_comments(self, allow_comments: bool):
+            self._j_builder = self._j_builder.setAllowComments(allow_comments)
+            return self
+
+        def set_array_element_delimiter(self, delimiter: str):
+            self._j_builder = self._j_builder.setArrayElementDelimiter(delimiter)
+            return self
+
+        def set_quote_character(self, c: str):
+            self._j_builder = self._j_builder.setQuoteCharacter(c)
+            return self
+
+        def set_escape_character(self, c: str):
+            self._j_builder = self._j_builder.setEscapeCharacter(c)
+            return self
+
+        def set_null_literal(self, null_literal: str):
+            self._j_builder = self._j_builder.setNullLiteral(null_literal)
+            return self
+
+        def set_ignore_parse_errors(self, ignore_parse_errors: bool):
+            self._j_builder = self._j_builder.setIgnoreParseErrors(ignore_parse_errors)
+            return self
+
+        def build(self):
+            j_csv_row_deserialization_schema = self._j_builder.build()
+            return CsvRowDeserializationSchema(
+                j_deserialization_schema=j_csv_row_deserialization_schema)
+
+
+class CsvRowSerializationSchema(SerializationSchema):
+    """
+    Serialization schema that serializes an object of Flink types into a CSV bytes. Serializes the
+    input row into an ObjectNode and converts it into byte[].
+
+    Result byte[] messages can be deserialized using CsvRowDeserializationSchema.
+    """
+    def __init__(self, j_csv_row_serialization_schema):
+        super(CsvRowSerializationSchema, self).__init__(j_csv_row_serialization_schema)
+
+    class Builder(object):
+        """
+        A builder for creating a CsvRowSerializationSchema.
+        """
+        def __init__(self, type_info: TypeInformation):
+            if type_info is None:
+                raise TypeError("Type information must not be None")
+            self._j_builder = get_gateway().jvm\
+                .org.apache.flink.formats.csv.CsvRowSerializationSchema.Builder(
+                type_info.get_java_type_info())
+
+        def set_field_delimiter(self, c: str):
+            self._j_builder = self._j_builder.setFieldDelimiter(c)
+            return self
+
+        def set_line_delimiter(self, delimiter: str):
+            self._j_builder = self._j_builder.setLineDelimiter(delimiter)
+            return self
+
+        def set_array_element_delimiter(self, delimiter: str):
+            self._j_builder = self._j_builder.setArrayElementDelimiter(delimiter)
+            return self
+
+        def disable_quote_character(self):
+            self._j_builder = self._j_builder.disableQuoteCharacter()
+            return self
+
+        def set_quote_character(self, c: str):
+            self._j_builder = self._j_builder.setQuoteCharacter(c)
+            return self
+
+        def set_escape_character(self, c: str):
+            self._j_builder = self._j_builder.setEscapeCharacter(c)
+            return self
+
+        def set_null_literal(self, s: str):
+            self._j_builder = self._j_builder.setNullLiteral(s)
+            return self
+
+        def build(self):
+            j_serialization_schema = self._j_builder.build()
+            return CsvRowSerializationSchema(j_csv_row_serialization_schema=j_serialization_schema)
diff --git a/flink-python/pyflink/datastream/formats/json.py b/flink-python/pyflink/datastream/formats/json.py
new file mode 100644
index 00000000000..4cc216e28cd
--- /dev/null
+++ b/flink-python/pyflink/datastream/formats/json.py
@@ -0,0 +1,150 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.common import SerializationSchema, TypeInformation, typeinfo, DeserializationSchema
+from pyflink.java_gateway import get_gateway
+
+
+__all__ = [
+    'JsonRowDeserializationSchema',
+    'JsonRowSerializationSchema'
+]
+
+
+class JsonRowDeserializationSchema(DeserializationSchema):
+    """
+    Deserialization schema from JSON to Flink types.
+
+    Deserializes a byte[] message as a JSON object and reads the specified fields.
+
+    Failures during deserialization are forwarded as wrapped IOExceptions.
+    """
+    def __init__(self, j_deserialization_schema):
+        super(JsonRowDeserializationSchema, self).__init__(j_deserialization_schema)
+
+    @staticmethod
+    def builder():
+        """
+        A static method to get a Builder for JsonRowDeserializationSchema.
+        """
+        return JsonRowDeserializationSchema.Builder()
+
+    class Builder(object):
+        """
+        Builder for JsonRowDeserializationSchema.
+        """
+
+        def __init__(self):
+            self._type_info = None
+            self._fail_on_missing_field = False
+            self._ignore_parse_errors = False
+
+        def type_info(self, type_info: TypeInformation):
+            """
+            Creates a JSON deserialization schema for the given type information.
+
+            :param type_info: Type information describing the result type. The field names of Row
+                              are used to parse the JSON properties.
+            """
+            self._type_info = type_info
+            return self
+
+        def json_schema(self, json_schema: str):
+            """
+            Creates a JSON deserialization schema for the given JSON schema.
+
+            :param json_schema: JSON schema describing the result type.
+            """
+            if json_schema is None:
+                raise TypeError("The json_schema must not be None.")
+            j_type_info = get_gateway().jvm \
+                .org.apache.flink.formats.json.JsonRowSchemaConverter.convert(json_schema)
+            self._type_info = typeinfo._from_java_type(j_type_info)
+            return self
+
+        def fail_on_missing_field(self):
+            """
+            Configures schema to fail if a JSON field is missing. A missing field is ignored and the
+            field is set to null by default.
+            """
+            self._fail_on_missing_field = True
+            return self
+
+        def ignore_parse_errors(self):
+            """
+            Configures schema to fail when parsing json failed. An exception will be thrown when
+            parsing json fails.
+            """
+            self._ignore_parse_errors = True
+            return self
+
+        def build(self):
+            JBuilder = get_gateway().jvm.org.apache.flink.formats.json.JsonRowDeserializationSchema\
+                .Builder
+            j_builder = JBuilder(self._type_info.get_java_type_info())
+
+            if self._fail_on_missing_field:
+                j_builder = j_builder.fialOnMissingField()
+
+            if self._ignore_parse_errors:
+                j_builder = j_builder.ignoreParseErrors()
+
+            j_deserialization_schema = j_builder.build()
+            return JsonRowDeserializationSchema(j_deserialization_schema=j_deserialization_schema)
+
+
+class JsonRowSerializationSchema(SerializationSchema):
+    """
+    Serialization schema that serializes an object of Flink types into a JSON bytes. Serializes the
+    input Flink object into a JSON string and converts it into byte[].
+
+    Result byte[] message can be deserialized using JsonRowDeserializationSchema.
+    """
+
+    def __init__(self, j_serialization_schema):
+        super(JsonRowSerializationSchema, self).__init__(j_serialization_schema)
+
+    @staticmethod
+    def builder():
+        return JsonRowSerializationSchema.Builder()
+
+    class Builder(object):
+        """
+        Builder for JsonRowSerializationSchema.
+        """
+        def __init__(self):
+            self._type_info = None
+
+        def with_type_info(self, type_info: TypeInformation):
+            """
+            Creates a JSON serialization schema for the given type information.
+
+            :param type_info: Type information describing the result type. The field names of Row
+                              are used to parse the JSON properties.
+            """
+            self._type_info = type_info
+            return self
+
+        def build(self):
+            if self._type_info is None:
+                raise TypeError("Typeinfo should be set.")
+
+            j_builder = get_gateway().jvm \
+                .org.apache.flink.formats.json.JsonRowSerializationSchema.builder()
+
+            j_schema = j_builder.withTypeInfo(self._type_info.get_java_type_info()).build()
+            return JsonRowSerializationSchema(j_serialization_schema=j_schema)
diff --git a/flink-python/pyflink/datastream/formats/orc.py b/flink-python/pyflink/datastream/formats/orc.py
index a34093d8fa7..459690dadbe 100644
--- a/flink-python/pyflink/datastream/formats/orc.py
+++ b/flink-python/pyflink/datastream/formats/orc.py
@@ -15,15 +15,17 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-from typing import Optional
+from typing import Optional, TYPE_CHECKING
 
 from pyflink.common import Configuration
-from pyflink.datastream.connectors.file_system import BulkWriterFactory, RowDataBulkWriterFactory
+from pyflink.common.serialization import BulkWriterFactory, RowDataBulkWriterFactory
 from pyflink.datastream.utils import create_hadoop_configuration, create_java_properties
 from pyflink.java_gateway import get_gateway
-from pyflink.table.types import _to_java_data_type, RowType
 from pyflink.util.java_utils import to_jarray
 
+if TYPE_CHECKING:
+    from pyflink.table.types import RowType
+
 __all__ = [
     'OrcBulkWriters'
 ]
@@ -31,20 +33,20 @@ __all__ = [
 
 class OrcBulkWriters(object):
     """
-    Convenient builder to create a :class:`~connectors.file_system.BulkWriterFactory` that writes
-    Row records with a defined RowType into Orc files in a batch fashion.
+    Convenient builder to create a :class:`~pyflink.common.serialization.BulkWriterFactory` that
+    writes records with a predefined schema into Orc files in a batch fashion.
 
     .. versionadded:: 1.16.0
     """
 
     @staticmethod
-    def for_row_type(row_type: RowType,
+    def for_row_type(row_type: 'RowType',
                      writer_properties: Optional[Configuration] = None,
                      hadoop_config: Optional[Configuration] = None) \
             -> BulkWriterFactory:
         """
-        Create a RowDataBulkWriterFactory that writes Row records with a defined RowType into Orc
-        files in a batch fashion.
+        Create a :class:`~pyflink.common.serialization.BulkWriterFactory` that writes records
+        with a predefined schema into Orc files in a batch fashion.
 
         Example:
         ::
@@ -69,10 +71,16 @@ class OrcBulkWriters(object):
         Note that in the above example, an identity map to indicate its RowTypeInfo is necessary
         before ``sink_to`` when ``ds`` is a source stream producing **RowData** records,
         because RowDataBulkWriterFactory assumes the input record type is Row.
+
+        :param row_type: Row type of orc table.
+        :param writer_properties: Properties that can be used in ORC WriterOptions.
+        :param hadoop_config: Hadoop configurations used in ORC WriterOptions.
         """
+        from pyflink.table.types import RowType
         if not isinstance(row_type, RowType):
             raise TypeError('row_type must be an instance of RowType')
 
+        from pyflink.table.types import _to_java_data_type
         j_data_type = _to_java_data_type(row_type)
         jvm = get_gateway().jvm
         j_row_type = j_data_type.getLogicalType()
diff --git a/flink-python/pyflink/datastream/formats/parquet.py b/flink-python/pyflink/datastream/formats/parquet.py
index 5d1164b82b8..83aef589f9f 100644
--- a/flink-python/pyflink/datastream/formats/parquet.py
+++ b/flink-python/pyflink/datastream/formats/parquet.py
@@ -15,22 +15,23 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-from typing import Optional
+from typing import Optional, TYPE_CHECKING
 
 from pyflink.common import Configuration
-from pyflink.datastream.connectors.file_system import StreamFormat, BulkFormat, BulkWriterFactory, \
-    RowDataBulkWriterFactory
+from pyflink.common.serialization import BulkWriterFactory, RowDataBulkWriterFactory
+from pyflink.datastream.connectors.file_system import StreamFormat, BulkFormat
 from pyflink.datastream.formats.avro import AvroSchema
 from pyflink.datastream.utils import create_hadoop_configuration
 from pyflink.java_gateway import get_gateway
-from pyflink.table.types import RowType, _to_java_data_type
 
+if TYPE_CHECKING:
+    from pyflink.table.types import RowType
 
 __all__ = [
     'AvroParquetReaders',
     'AvroParquetWriters',
-    'ParquetBulkWriter',
     'ParquetColumnarRowInputFormat',
+    'ParquetBulkWriters'
 ]
 
 
@@ -142,7 +143,7 @@ class ParquetColumnarRowInputFormat(BulkFormat):
     """
 
     def __init__(self,
-                 row_type: RowType,
+                 row_type: 'RowType',
                  hadoop_config: Optional[Configuration] = None,
                  batch_size: int = 2048,
                  is_utc_timestamp: bool = False,
@@ -150,6 +151,7 @@ class ParquetColumnarRowInputFormat(BulkFormat):
         if not hadoop_config:
             hadoop_config = Configuration()
 
+        from pyflink.table.types import _to_java_data_type
         jvm = get_gateway().jvm
         j_row_type = _to_java_data_type(row_type).getLogicalType()
         produced_type_info = jvm.org.apache.flink.table.runtime.typeutils. \
@@ -161,20 +163,21 @@ class ParquetColumnarRowInputFormat(BulkFormat):
         super().__init__(j_parquet_columnar_format)
 
 
-class ParquetBulkWriter(object):
+class ParquetBulkWriters(object):
     """
-    Convenient builder to create a :class:`BulkWriterFactory` that writes Rows with a defined
-    RowType into Parquet files in a batch fashion.
+    Convenient builder to create a :class:`~pyflink.common.serialization.BulkWriterFactory` that
+    writes records with a predefined schema into Parquet files in a batch fashion.
 
     .. versionadded:: 1.16.0
     """
 
     @staticmethod
-    def for_row_type(row_type: RowType, hadoop_config: Optional[Configuration] = None,
+    def for_row_type(row_type: 'RowType',
+                     hadoop_config: Optional[Configuration] = None,
                      utc_timestamp: bool = False) -> 'BulkWriterFactory':
         """
-        Create a RowDataBulkWriterFactory that writes Rows records with a defined RowType into
-        Parquet files in a batch fashion.
+        Create a :class:`~pyflink.common.serialization.BulkWriterFactory` that writes records
+        with a predefined schema into Parquet files in a batch fashion.
 
         Example:
         ::
@@ -188,7 +191,7 @@ class ParquetBulkWriter(object):
             ...     [Types.STRING(), Types.LIST(Types.INT())]
             ... )
             >>> sink = FileSink.for_bulk_format(
-            ...     OUTPUT_DIR, ParquetBulkWriter.for_row_type(
+            ...     OUTPUT_DIR, ParquetBulkWriters.for_row_type(
             ...         row_type,
             ...         hadoop_config=Configuration(),
             ...         utc_timestamp=True,
@@ -198,11 +201,17 @@ class ParquetBulkWriter(object):
 
         Note that in the above example, an identity map to indicate its RowTypeInfo is necessary
         before ``sink_to`` when ``ds`` is a source stream producing **RowData** records, because
-        RowDataBulkWriterFactory assumes the input record type is **Row** .
+        RowDataBulkWriterFactory assumes the input record type is **Row**.
+
+        :param row_type: Row type of parquet table.
+        :param hadoop_config: Haodop configurations.
+        :param utc_timestamp: Whether to use UTC timezone or local timezone to the conversion
+                              between epoch time and LocalDateTime.
         """
         if not hadoop_config:
             hadoop_config = Configuration()
 
+        from pyflink.table.types import _to_java_data_type
         jvm = get_gateway().jvm
         JParquetRowDataBuilder = jvm.org.apache.flink.formats.parquet.row.ParquetRowDataBuilder
         return RowDataBulkWriterFactory(JParquetRowDataBuilder.createWriterFactory(
diff --git a/flink-python/pyflink/datastream/formats/tests/test_avro.py b/flink-python/pyflink/datastream/formats/tests/test_avro.py
index 01e7c1ea66a..bbc1d4c932b 100644
--- a/flink-python/pyflink/datastream/formats/tests/test_avro.py
+++ b/flink-python/pyflink/datastream/formats/tests/test_avro.py
@@ -26,8 +26,8 @@ from py4j.java_gateway import JavaObject, java_import
 
 from pyflink.datastream import MapFunction
 from pyflink.datastream.connectors.file_system import FileSink
-from pyflink.datastream.formats.avro import AvroSchema, GenericRecordAvroTypeInfo, AvroWriters, \
-    AvroInputFormat
+from pyflink.datastream.formats.avro import AvroSchema, GenericRecordAvroTypeInfo, \
+    AvroBulkWriters, AvroInputFormat
 from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
 from pyflink.java_gateway import get_gateway
 from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase
@@ -144,7 +144,7 @@ class FileSinkAvroWritersTests(PyFlinkStreamingTestCase):
     def _build_avro_job(self, schema, objects):
         ds = self.env.from_collection(objects)
         sink = FileSink.for_bulk_format(
-            self.avro_dir_name, AvroWriters.for_generic_record(schema)
+            self.avro_dir_name, AvroBulkWriters.for_generic_record(schema)
         ).build()
         ds.map(lambda e: e, output_type=GenericRecordAvroTypeInfo(schema)).sink_to(sink)
 
diff --git a/flink-python/pyflink/datastream/formats/tests/test_csv.py b/flink-python/pyflink/datastream/formats/tests/test_csv.py
index e4b14a9ec35..c2756923446 100644
--- a/flink-python/pyflink/datastream/formats/tests/test_csv.py
+++ b/flink-python/pyflink/datastream/formats/tests/test_csv.py
@@ -23,10 +23,12 @@ from typing import Tuple, List
 from pyflink.common import WatermarkStrategy, Types
 from pyflink.datastream import MapFunction
 from pyflink.datastream.connectors.file_system import FileSource, FileSink
-from pyflink.datastream.formats.csv import CsvSchema, CsvReaderFormat, CsvBulkWriter
+from pyflink.datastream.formats.csv import CsvSchema, CsvReaderFormat, CsvBulkWriters, \
+    CsvRowSerializationSchema, CsvRowDeserializationSchema
 from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
+from pyflink.java_gateway import get_gateway
 from pyflink.table import DataTypes
-from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase
+from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase, PyFlinkTestCase
 
 
 class FileSourceCsvReaderFormatTests(PyFlinkStreamingTestCase):
@@ -163,7 +165,7 @@ class FileSinkCsvBulkWriterTests(PyFlinkStreamingTestCase):
         ).build()
         ds = self.env.from_source(source, WatermarkStrategy.no_watermarks(), 'csv-source')
         sink = FileSink.for_bulk_format(
-            self.csv_dir_name, CsvBulkWriter.for_schema(schema)
+            self.csv_dir_name, CsvBulkWriters.for_schema(schema)
         ).build()
         ds.map(lambda e: e, output_type=schema.get_type_info()).sink_to(sink)
 
@@ -175,6 +177,54 @@ class FileSinkCsvBulkWriterTests(PyFlinkStreamingTestCase):
         return lines
 
 
+class JsonSerializationSchemaTests(PyFlinkTestCase):
+
+    def test_csv_row_serialization_schema(self):
+        jvm = get_gateway().jvm
+        JRow = jvm.org.apache.flink.types.Row
+
+        j_row = JRow(3)
+        j_row.setField(0, "BEGIN")
+        j_row.setField(2, "END")
+
+        def field_assertion(field_info, csv_value, value, field_delimiter):
+            row_info = Types.ROW([Types.STRING(), field_info, Types.STRING()])
+            expected_csv = "BEGIN" + field_delimiter + csv_value + field_delimiter + "END\n"
+            j_row.setField(1, value)
+
+            csv_row_serialization_schema = CsvRowSerializationSchema.Builder(row_info)\
+                .set_escape_character('*').set_quote_character('\'')\
+                .set_array_element_delimiter(':').set_field_delimiter(';').build()
+            csv_row_deserialization_schema = CsvRowDeserializationSchema.Builder(row_info)\
+                .set_escape_character('*').set_quote_character('\'')\
+                .set_array_element_delimiter(':').set_field_delimiter(';').build()
+            csv_row_serialization_schema._j_serialization_schema.open(
+                jvm.org.apache.flink.connector.testutils.formats.DummyInitializationContext())
+            csv_row_deserialization_schema._j_deserialization_schema.open(
+                jvm.org.apache.flink.connector.testutils.formats.DummyInitializationContext())
+
+            serialized_bytes = csv_row_serialization_schema._j_serialization_schema.serialize(j_row)
+            self.assertEqual(expected_csv, str(serialized_bytes, encoding='utf-8'))
+
+            j_deserialized_row = csv_row_deserialization_schema._j_deserialization_schema\
+                .deserialize(expected_csv.encode("utf-8"))
+            self.assertTrue(j_row.equals(j_deserialized_row))
+
+        field_assertion(Types.STRING(), "'123''4**'", "123'4*", ";")
+        field_assertion(Types.STRING(), "'a;b''c'", "a;b'c", ";")
+        field_assertion(Types.INT(), "12", 12, ";")
+
+        test_j_row = JRow(2)
+        test_j_row.setField(0, "1")
+        test_j_row.setField(1, "hello")
+
+        field_assertion(Types.ROW([Types.STRING(), Types.STRING()]), "'1:hello'", test_j_row, ";")
+        test_j_row.setField(1, "hello world")
+        field_assertion(Types.ROW([Types.STRING(), Types.STRING()]), "'1:hello world'", test_j_row,
+                        ";")
+        field_assertion(Types.STRING(), "null", "null", ";")
+
+
 class PassThroughMapFunction(MapFunction):
 
     def map(self, value):
diff --git a/flink-python/pyflink/datastream/formats/tests/test_json.py b/flink-python/pyflink/datastream/formats/tests/test_json.py
new file mode 100644
index 00000000000..e8fc02bda05
--- /dev/null
+++ b/flink-python/pyflink/datastream/formats/tests/test_json.py
@@ -0,0 +1,57 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.common import Types
+from pyflink.datastream.formats.json import JsonRowSerializationSchema, JsonRowDeserializationSchema
+from pyflink.java_gateway import get_gateway
+from pyflink.testing.test_case_utils import PyFlinkTestCase
+
+
+class JsonSerializationSchemaTests(PyFlinkTestCase):
+
+    def test_json_row_serialization_deserialization_schema(self):
+        jvm = get_gateway().jvm
+        jsons = ["{\"svt\":\"2020-02-24T12:58:09.209+0800\"}",
+                 "{\"svt\":\"2020-02-24T12:58:09.209+0800\", "
+                 "\"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\"},\"ids\":[1, 2, 3]}",
+                 "{\"svt\":\"2020-02-24T12:58:09.209+0800\"}"]
+        expected_jsons = ["{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"ops\":null,\"ids\":null}",
+                          "{\"svt\":\"2020-02-24T12:58:09.209+0800\","
+                          "\"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\"},"
+                          "\"ids\":[1,2,3]}",
+                          "{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"ops\":null,\"ids\":null}"]
+
+        row_schema = Types.ROW_NAMED(["svt", "ops", "ids"],
+                                     [Types.STRING(),
+                                     Types.ROW_NAMED(['id'], [Types.STRING()]),
+                                     Types.PRIMITIVE_ARRAY(Types.INT())])
+
+        json_row_serialization_schema = JsonRowSerializationSchema.builder() \
+            .with_type_info(row_schema).build()
+        json_row_deserialization_schema = JsonRowDeserializationSchema.builder() \
+            .type_info(row_schema).build()
+        json_row_serialization_schema._j_serialization_schema.open(
+            jvm.org.apache.flink.connector.testutils.formats.DummyInitializationContext())
+        json_row_deserialization_schema._j_deserialization_schema.open(
+            jvm.org.apache.flink.connector.testutils.formats.DummyInitializationContext())
+
+        for i in range(len(jsons)):
+            j_row = json_row_deserialization_schema._j_deserialization_schema\
+                .deserialize(bytes(jsons[i], encoding='utf-8'))
+            result = str(json_row_serialization_schema._j_serialization_schema.serialize(j_row),
+                         encoding='utf-8')
+            self.assertEqual(expected_jsons[i], result)
diff --git a/flink-python/pyflink/datastream/formats/tests/test_parquet.py b/flink-python/pyflink/datastream/formats/tests/test_parquet.py
index 16bf47edbfe..d7ee2cdd30e 100644
--- a/flink-python/pyflink/datastream/formats/tests/test_parquet.py
+++ b/flink-python/pyflink/datastream/formats/tests/test_parquet.py
@@ -44,7 +44,7 @@ from pyflink.datastream.formats.tests.test_avro import \
     _create_basic_avro_schema_and_records, _import_avro_classes
 from pyflink.datastream.formats.avro import GenericRecordAvroTypeInfo, AvroSchema
 from pyflink.datastream.formats.parquet import AvroParquetReaders, ParquetColumnarRowInputFormat, \
-    AvroParquetWriters, ParquetBulkWriter
+    AvroParquetWriters, ParquetBulkWriters
 from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
 from pyflink.datastream.utils import create_hadoop_configuration
 from pyflink.java_gateway import get_gateway
@@ -270,7 +270,7 @@ class FileSinkParquetBulkWriterTests(PyFlinkStreamingTestCase):
         conversion_type_info: Optional[RowTypeInfo] = None,
     ):
         sink = FileSink.for_bulk_format(
-            self.parquet_dir_name, ParquetBulkWriter.for_row_type(row_type, utc_timestamp=True)
+            self.parquet_dir_name, ParquetBulkWriters.for_row_type(row_type, utc_timestamp=True)
         ).build()
         ds = self.env.from_collection(data, type_info=row_type_info)
         if conversion_type_info:
diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py b/flink-python/pyflink/datastream/stream_execution_environment.py
index 97bce229b04..31cfaf38bd1 100644
--- a/flink-python/pyflink/datastream/stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/stream_execution_environment.py
@@ -18,12 +18,13 @@
 import os
 import tempfile
 
-from typing import List, Any, Optional, cast, TYPE_CHECKING
+from typing import List, Any, Optional, cast
 
 from py4j.java_gateway import JavaObject
 
 from pyflink.common import Configuration, WatermarkStrategy
 from pyflink.common.execution_config import ExecutionConfig
+from pyflink.common.io import InputFormat
 from pyflink.common.job_client import JobClient
 from pyflink.common.job_execution_result import JobExecutionResult
 from pyflink.common.restart_strategy import RestartStrategies, RestartStrategyConfiguration
@@ -43,8 +44,6 @@ from pyflink.serializers import PickleSerializer
 from pyflink.util.java_utils import load_java_class, add_jars_to_context_class_loader, \
     invoke_method, get_field_value, is_local_deployment, get_j_env_configuration
 
-if TYPE_CHECKING:
-    from pyflink.datastream.connectors.file_system import InputFormat
 
 __all__ = ['StreamExecutionEnvironment']
 
@@ -837,7 +836,7 @@ class StreamExecutionEnvironment(object):
 
         return StreamExecutionEnvironment(j_stream_exection_environment)
 
-    def create_input(self, input_format: 'InputFormat',
+    def create_input(self, input_format: InputFormat,
                      type_info: Optional[TypeInformation] = None):
         """
         Create an input data stream with InputFormat.
diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
index c80ab5f7d9b..1d3a94d3d3a 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
@@ -27,13 +27,13 @@ import unittest
 import uuid
 
 from pyflink.common import Configuration, ExecutionConfig, RestartStrategies
-from pyflink.common.serialization import JsonRowDeserializationSchema
 from pyflink.common.typeinfo import Types
 from pyflink.datastream import (StreamExecutionEnvironment, CheckpointConfig,
                                 CheckpointingMode, MemoryStateBackend, TimeCharacteristic,
                                 SlotSharingGroup)
 from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
 from pyflink.datastream.execution_mode import RuntimeExecutionMode
+from pyflink.datastream.formats.json import JsonRowDeserializationSchema
 from pyflink.datastream.functions import SourceFunction
 from pyflink.datastream.slot_sharing_group import MemorySize
 from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
diff --git a/flink-python/pyflink/datastream/utils.py b/flink-python/pyflink/datastream/utils.py
index efbdd1b5da1..caad21f88f0 100644
--- a/flink-python/pyflink/datastream/utils.py
+++ b/flink-python/pyflink/datastream/utils.py
@@ -34,15 +34,6 @@ class ResultTypeQueryable(object):
         pass
 
 
-class JavaObjectWrapper(object):
-
-    def __init__(self, j_object):
-        self._j_object = j_object
-
-    def get_java_object(self):
-        return self._j_object
-
-
 def create_hadoop_configuration(config: Configuration):
     jvm = get_gateway().jvm
     hadoop_config = jvm.org.apache.hadoop.conf.Configuration()
diff --git a/flink-python/pyflink/examples/datastream/connectors/kafka_avro_format.py b/flink-python/pyflink/examples/datastream/connectors/kafka_avro_format.py
index dd3711aff90..2a66bc4e8b1 100644
--- a/flink-python/pyflink/examples/datastream/connectors/kafka_avro_format.py
+++ b/flink-python/pyflink/examples/datastream/connectors/kafka_avro_format.py
@@ -18,9 +18,10 @@
 import logging
 import sys
 
-from pyflink.common import AvroRowSerializationSchema, Types, AvroRowDeserializationSchema
+from pyflink.common import Types
 from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer
+from pyflink.datastream.formats.avro import AvroRowSerializationSchema, AvroRowDeserializationSchema
 
 
 # Make sure that the Kafka cluster is started and the topic 'test_avro_topic' is
diff --git a/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py b/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py
index 99d5d62e90c..4dbb243fcf9 100644
--- a/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py
+++ b/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py
@@ -18,9 +18,11 @@
 import logging
 import sys
 
-from pyflink.common import Types, JsonRowDeserializationSchema, CsvRowSerializationSchema
+from pyflink.common import Types
 from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer
+from pyflink.datastream.formats.csv import CsvRowSerializationSchema
+from pyflink.datastream.formats.json import JsonRowDeserializationSchema
 
 
 # Make sure that the Kafka cluster is started and the topic 'test_csv_topic' is
diff --git a/flink-python/pyflink/examples/datastream/connectors/kafka_json_format.py b/flink-python/pyflink/examples/datastream/connectors/kafka_json_format.py
index cf650fdb731..3cae241ba43 100644
--- a/flink-python/pyflink/examples/datastream/connectors/kafka_json_format.py
+++ b/flink-python/pyflink/examples/datastream/connectors/kafka_json_format.py
@@ -18,9 +18,10 @@
 import logging
 import sys
 
-from pyflink.common import Types, JsonRowDeserializationSchema, JsonRowSerializationSchema
+from pyflink.common import Types
 from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer
+from pyflink.datastream.formats.json import JsonRowSerializationSchema, JsonRowDeserializationSchema
 
 
 # Make sure that the Kafka cluster is started and the topic 'test_json_topic' is