You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/07/18 01:49:25 UTC

[flink] branch master updated: [FLINK-28464][python][csv] Support CsvReaderFormat

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cef3aa136ed [FLINK-28464][python][csv] Support CsvReaderFormat
cef3aa136ed is described below

commit cef3aa136edb555b229950fb09067e042dd4361f
Author: Juntao Hu <ma...@gmail.com>
AuthorDate: Fri Jul 8 11:33:37 2022 +0800

    [FLINK-28464][python][csv] Support CsvReaderFormat
    
    This closes #20220.
---
 .../docs/connectors/datastream/formats/csv.md      |  18 ++
 .../docs/connectors/datastream/formats/csv.md      |  18 ++
 flink-python/pyflink/datastream/__init__.py        |  13 +
 .../connectors/tests/test_file_system.py           | 152 +++++++++-
 .../pyflink/datastream/formats/__init__.py         |   5 +
 flink-python/pyflink/datastream/formats/base.py    |   3 +
 flink-python/pyflink/datastream/formats/csv.py     | 323 +++++++++++++++++++++
 flink-python/pyflink/datastream/formats/parquet.py |   4 +-
 flink-python/pyflink/pyflink_gateway_server.py     |   1 -
 9 files changed, 533 insertions(+), 4 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/formats/csv.md b/docs/content.zh/docs/connectors/datastream/formats/csv.md
index 96cd5143e3f..870dde3488d 100644
--- a/docs/content.zh/docs/connectors/datastream/formats/csv.md
+++ b/docs/content.zh/docs/connectors/datastream/formats/csv.md
@@ -38,6 +38,8 @@ To use the CSV format you need to add the Flink CSV dependency to your project:
 </dependency>
 ```
 
+For PyFlink users, you could use it directly in your jobs.
+
 Flink supports reading CSV files using `CsvReaderFormat`. The reader utilizes Jackson library and allows passing the corresponding configuration for the CSV schema and parsing options.
 
 `CsvReaderFormat` can be initialized and used like this:
@@ -113,6 +115,22 @@ CsvReaderFormat<ComplexPojo> csvFormat =
                         .build(),
                 TypeInformation.of(ComplexPojo.class));
 ```
+
+For PyFlink users, a csv schema can be defined by manually adding columns, and the output type of the csv source will be a Row with each column mapped to a field.
+```python
+schema = CsvSchema.builder() \
+    .add_number_column('id', number_type=DataTypes.BIGINT()) \
+    .add_array_column('array', separator='#', element_type=DataTypes.INT()) \
+    .set_column_separator(',') \
+    .build()
+
+source = FileSource.for_record_stream_format(
+    CsvReaderFormat.for_schema(schema), CSV_FILE_PATH).build()
+
+# the type of record will be Types.ROW_NAMED(['id', 'array'], [Types.LONG(), Types.LIST(Types.INT())])
+ds = env.from_source(source, WatermarkStrategy.no_watermarks(), 'csv-source')
+```
+
 The corresponding CSV file:
 ```
 0,1#2#3
diff --git a/docs/content/docs/connectors/datastream/formats/csv.md b/docs/content/docs/connectors/datastream/formats/csv.md
index 96cd5143e3f..870dde3488d 100644
--- a/docs/content/docs/connectors/datastream/formats/csv.md
+++ b/docs/content/docs/connectors/datastream/formats/csv.md
@@ -38,6 +38,8 @@ To use the CSV format you need to add the Flink CSV dependency to your project:
 </dependency>
 ```
 
+For PyFlink users, you could use it directly in your jobs.
+
 Flink supports reading CSV files using `CsvReaderFormat`. The reader utilizes Jackson library and allows passing the corresponding configuration for the CSV schema and parsing options.
 
 `CsvReaderFormat` can be initialized and used like this:
@@ -113,6 +115,22 @@ CsvReaderFormat<ComplexPojo> csvFormat =
                         .build(),
                 TypeInformation.of(ComplexPojo.class));
 ```
+
+For PyFlink users, a csv schema can be defined by manually adding columns, and the output type of the csv source will be a Row with each column mapped to a field.
+```python
+schema = CsvSchema.builder() \
+    .add_number_column('id', number_type=DataTypes.BIGINT()) \
+    .add_array_column('array', separator='#', element_type=DataTypes.INT()) \
+    .set_column_separator(',') \
+    .build()
+
+source = FileSource.for_record_stream_format(
+    CsvReaderFormat.for_schema(schema), CSV_FILE_PATH).build()
+
+# the type of record will be Types.ROW_NAMED(['id', 'array'], [Types.LONG(), Types.LIST(Types.INT())])
+ds = env.from_source(source, WatermarkStrategy.no_watermarks(), 'csv-source')
+```
+
 The corresponding CSV file:
 ```
 0,1#2#3
diff --git a/flink-python/pyflink/datastream/__init__.py b/flink-python/pyflink/datastream/__init__.py
index be54f5bc301..2c4a9813d74 100644
--- a/flink-python/pyflink/datastream/__init__.py
+++ b/flink-python/pyflink/datastream/__init__.py
@@ -190,6 +190,19 @@ Classes to define source & sink:
     - :class:`connectors.RMQSink`:
       A Sink for publishing data into RabbitMQ.
 
+Classes to define formats used together with source & sink:
+
+    - :class:`formats.CsvReaderFormat`:
+      A :class:`connectors.StreamFormat` to read csv files into Row data.
+    - :class:`formats.AvroInputFormat`:
+      An :class:`formats.InputFormat` to read avro files.
+    - :class:`formats.ParquetColumnarRowInputFormat`:
+      A :class:`connectors.BulkFormat` to read columnar parquet files into Row data in a
+      batch-processing fashion.
+    - :class:`formats.AvroParquetReaders`:
+      A convenience builder to create reader format that reads individual Avro records from a
+      Parquet stream. Only GenericRecord is supported in PyFlink.
+
 Other important classes:
 
     - :class:`TimeCharacteristic`:
diff --git a/flink-python/pyflink/datastream/connectors/tests/test_file_system.py b/flink-python/pyflink/datastream/connectors/tests/test_file_system.py
index d2ce2a47495..501d4041363 100644
--- a/flink-python/pyflink/datastream/connectors/tests/test_file_system.py
+++ b/flink-python/pyflink/datastream/connectors/tests/test_file_system.py
@@ -22,8 +22,9 @@ from typing import Tuple, List
 
 from py4j.java_gateway import java_import, JavaObject
 
-from pyflink.common import Configuration
+from pyflink.common import Types, Configuration
 from pyflink.common.watermark_strategy import WatermarkStrategy
+from pyflink.datastream.formats.csv import CsvSchema, CsvReaderFormat
 from pyflink.datastream.functions import MapFunction
 from pyflink.datastream.connectors.file_system import FileSource
 from pyflink.datastream.formats.avro import AvroSchema, AvroInputFormat
@@ -34,6 +35,155 @@ from pyflink.table.types import RowType, DataTypes
 from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase
 
 
+class FileSourceCsvReaderFormatTests(PyFlinkStreamingTestCase):
+
+    def setUp(self):
+        super().setUp()
+        self.test_sink = DataStreamTestSinkFunction()
+        self.csv_file_name = tempfile.mktemp(suffix='.csv', dir=self.tempdir)
+
+    def test_csv_primitive_column(self):
+        schema = CsvSchema.builder() \
+            .add_number_column('tinyint', DataTypes.TINYINT()) \
+            .add_number_column('smallint', DataTypes.SMALLINT()) \
+            .add_number_column('int', DataTypes.INT()) \
+            .add_number_column('bigint', DataTypes.BIGINT()) \
+            .add_number_column('float', DataTypes.FLOAT()) \
+            .add_number_column('double', DataTypes.DOUBLE()) \
+            .add_number_column('decimal', DataTypes.DECIMAL(2, 0)) \
+            .add_boolean_column('boolean') \
+            .add_string_column('string') \
+            .build()
+        with open(self.csv_file_name, 'w') as f:
+            f.write('127,')
+            f.write('-32767,')
+            f.write('2147483647,')
+            f.write('-9223372036854775808,')
+            f.write('3e38,')
+            f.write('2e-308,')
+            f.write('1.5,')
+            f.write('true,')
+            f.write('string\n')
+        self._build_csv_job(schema)
+        self.env.execute('test_csv_primitive_column')
+        row = self.test_sink.get_results(True, False)[0]
+        self.assertEqual(row['tinyint'], 127)
+        self.assertEqual(row['smallint'], -32767)
+        self.assertEqual(row['int'], 2147483647)
+        self.assertEqual(row['bigint'], -9223372036854775808)
+        self.assertAlmostEqual(row['float'], 3e38, delta=1e31)
+        self.assertAlmostEqual(row['double'], 2e-308, delta=2e-301)
+        self.assertAlmostEqual(row['decimal'], 2)
+        self.assertEqual(row['boolean'], True)
+        self.assertEqual(row['string'], 'string')
+
+    def test_csv_array_column(self):
+        schema = CsvSchema.builder() \
+            .add_array_column('number_array', separator=';', element_type=DataTypes.INT()) \
+            .add_array_column('boolean_array', separator=':', element_type=DataTypes.BOOLEAN()) \
+            .add_array_column('string_array', separator=',', element_type=DataTypes.STRING()) \
+            .set_column_separator('|') \
+            .build()
+        with open(self.csv_file_name, 'w') as f:
+            f.write('1;2;3|')
+            f.write('true:false|')
+            f.write('a,b,c\n')
+        self._build_csv_job(schema)
+        self.env.execute('test_csv_array_column')
+        row = self.test_sink.get_results(True, False)[0]
+        self.assertListEqual(row['number_array'], [1, 2, 3])
+        self.assertListEqual(row['boolean_array'], [True, False])
+        self.assertListEqual(row['string_array'], ['a', 'b', 'c'])
+
+    def test_csv_allow_comments(self):
+        schema = CsvSchema.builder() \
+            .add_string_column('string') \
+            .set_allow_comments() \
+            .build()
+        with open(self.csv_file_name, 'w') as f:
+            f.write('a\n')
+            f.write('# this is comment\n')
+            f.write('b\n')
+        self._build_csv_job(schema)
+        self.env.execute('test_csv_allow_comments')
+        rows = self.test_sink.get_results(True, False)
+        self.assertEqual(rows[0]['string'], 'a')
+        self.assertEqual(rows[1]['string'], 'b')
+
+    def test_csv_use_header(self):
+        schema = CsvSchema.builder() \
+            .add_string_column('string') \
+            .add_number_column('number') \
+            .set_use_header() \
+            .build()
+        with open(self.csv_file_name, 'w') as f:
+            f.write('h1,h2\n')
+            f.write('string,123\n')
+        self._build_csv_job(schema)
+        self.env.execute('test_csv_use_header')
+        row = self.test_sink.get_results(True, False)[0]
+        self.assertEqual(row['string'], 'string')
+        self.assertEqual(row['number'], 123)
+
+    def test_csv_strict_headers(self):
+        schema = CsvSchema.builder() \
+            .add_string_column('string') \
+            .add_number_column('number') \
+            .set_use_header() \
+            .set_strict_headers() \
+            .build()
+        with open(self.csv_file_name, 'w') as f:
+            f.write('string,number\n')
+            f.write('string,123\n')
+        self._build_csv_job(schema)
+        self.env.execute('test_csv_strict_headers')
+        row = self.test_sink.get_results(True, False)[0]
+        self.assertEqual(row['string'], 'string')
+        self.assertEqual(row['number'], 123)
+
+    def test_csv_default_quote_char(self):
+        schema = CsvSchema.builder() \
+            .add_string_column('string') \
+            .build()
+        with open(self.csv_file_name, 'w') as f:
+            f.write('"string"\n')
+        self._build_csv_job(schema)
+        self.env.execute('test_csv_default_quote_char')
+        row = self.test_sink.get_results(True, False)[0]
+        self.assertEqual(row['string'], 'string')
+
+    def test_csv_customize_quote_char(self):
+        schema = CsvSchema.builder() \
+            .add_string_column('string') \
+            .set_quote_char('`') \
+            .build()
+        with open(self.csv_file_name, 'w') as f:
+            f.write('`string`\n')
+        self._build_csv_job(schema)
+        self.env.execute('test_csv_customize_quote_char')
+        row = self.test_sink.get_results(True, False)[0]
+        self.assertEqual(row['string'], 'string')
+
+    def test_csv_use_escape_char(self):
+        schema = CsvSchema.builder() \
+            .add_string_column('string') \
+            .set_escape_char('\\') \
+            .build()
+        with open(self.csv_file_name, 'w') as f:
+            f.write('\\"string\\"\n')
+        self._build_csv_job(schema)
+        self.env.execute('test_csv_use_escape_char')
+        row = self.test_sink.get_results(True, False)[0]
+        self.assertEqual(row['string'], '"string"')
+
+    def _build_csv_job(self, schema):
+        source = FileSource.for_record_stream_format(
+            CsvReaderFormat.for_schema(schema), self.csv_file_name).build()
+        ds = self.env.from_source(source, WatermarkStrategy.no_watermarks(), 'csv-source')
+        ds.map(PassThroughMapFunction(), output_type=Types.PICKLED_BYTE_ARRAY()) \
+            .add_sink(self.test_sink)
+
+
 @unittest.skipIf(os.environ.get('HADOOP_CLASSPATH') is None,
                  'Some Hadoop lib is needed for Parquet Columnar format tests')
 class FileSourceParquetColumnarFormatTests(PyFlinkStreamingTestCase):
diff --git a/flink-python/pyflink/datastream/formats/__init__.py b/flink-python/pyflink/datastream/formats/__init__.py
index aafb2d99dd8..37ad31a4ab7 100644
--- a/flink-python/pyflink/datastream/formats/__init__.py
+++ b/flink-python/pyflink/datastream/formats/__init__.py
@@ -16,11 +16,16 @@
 # limitations under the License.
 ################################################################################
 from .avro import AvroSchema, AvroInputFormat
+from .base import InputFormat
+from .csv import CsvSchema, CsvReaderFormat
 from .parquet import AvroParquetReaders, ParquetColumnarRowInputFormat
 
 __all__ = [
+    'InputFormat',
     'AvroInputFormat',
     'AvroParquetReaders',
     'AvroSchema',
+    'CsvReaderFormat',
+    'CsvSchema',
     'ParquetColumnarRowInputFormat'
 ]
diff --git a/flink-python/pyflink/datastream/formats/base.py b/flink-python/pyflink/datastream/formats/base.py
index f2b17bcba39..ac0500d0d62 100644
--- a/flink-python/pyflink/datastream/formats/base.py
+++ b/flink-python/pyflink/datastream/formats/base.py
@@ -22,6 +22,9 @@ from py4j.java_gateway import JavaObject
 from pyflink.datastream.functions import JavaFunctionWrapper
 
 
+__all__ = ['InputFormat']
+
+
 class InputFormat(JavaFunctionWrapper):
     """
     The Python wrapper of Java InputFormat interface, which is the base interface for data sources
diff --git a/flink-python/pyflink/datastream/formats/csv.py b/flink-python/pyflink/datastream/formats/csv.py
new file mode 100644
index 00000000000..82f405da191
--- /dev/null
+++ b/flink-python/pyflink/datastream/formats/csv.py
@@ -0,0 +1,323 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Optional, cast
+
+from py4j.java_gateway import get_java_class
+from pyflink.datastream.connectors import StreamFormat
+from pyflink.java_gateway import get_gateway
+from pyflink.table.types import DataType, DataTypes, _to_java_data_type, RowType, NumericType
+from pyflink.util.java_utils import to_jarray
+
+
+class CsvSchema(object):
+    """
+    CsvSchema holds schema information of a csv file, corresponding to Java
+    ``com.fasterxml.jackson.dataformat.csv.CsvSchema`` class.
+
+    .. versionadded:: 1.16.0
+    """
+
+    def __init__(self, j_schema, data_type: DataType):
+        self._j_schema = j_schema
+        self._data_type = data_type
+
+    @staticmethod
+    def builder() -> 'CsvSchemaBuilder':
+        """
+        Returns a :class:`CsvSchemaBuilder`.
+        """
+        return CsvSchemaBuilder()
+
+    def size(self):
+        return self._j_schema.size()
+
+    def __len__(self):
+        return self.size()
+
+    def __str__(self):
+        return self._j_schema.toString()
+
+    def __repr__(self):
+        return str(self)
+
+
+class CsvSchemaBuilder(object):
+    """
+    CsvSchemaBuilder is for building a :class:`CsvSchemaBuilder`, corresponding to Java
+    ``com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder`` class.
+
+    .. versionadded:: 1.16.0
+    """
+
+    def __init__(self):
+        jvm = get_gateway().jvm
+        self._j_schema_builder = jvm.org.apache.flink.shaded.jackson2.com.fasterxml.jackson \
+            .dataformat.csv.CsvSchema.builder()
+        self._fields = []
+
+    def build(self) -> 'CsvSchema':
+        """
+        Build the :class:`CsvSchema`.
+        """
+        return CsvSchema(self._j_schema_builder.build(), DataTypes.ROW(self._fields))
+
+    def add_array_column(self,
+                         name: str,
+                         separator: str = ';',
+                         element_type: Optional[DataType] = DataTypes.STRING()) \
+            -> 'CsvSchemaBuilder':
+        """
+        Add an array column to schema, the type of elements could be specified via ``element_type``,
+        which should be primitive types.
+
+        :param name: Name of the column.
+        :param separator: Text separator of array elements, default to ``;``.
+        :param element_type: DataType of array elements, default to ``DataTypes.STRING()``.
+        """
+        self._j_schema_builder.addArrayColumn(name, separator)
+        self._fields.append(DataTypes.FIELD(name, DataTypes.ARRAY(element_type)))
+        return self
+
+    def add_boolean_column(self, name: str) -> 'CsvSchemaBuilder':
+        """
+        Add a boolean column to schema, with type as ``DataTypes.BOOLEAN()``.
+
+        :param name: Name of the column.
+        """
+        self._j_schema_builder.addBooleanColumn(name)
+        self._fields.append(DataTypes.FIELD(name, DataTypes.BOOLEAN()))
+        return self
+
+    def add_number_column(self, name: str,
+                          number_type: Optional[NumericType] = DataTypes.BIGINT()) \
+            -> 'CsvSchemaBuilder':
+        """
+        Add a number column to schema, the type of number could be specified via ``number_type``.
+
+        :param name: Name of the column.
+        :param number_type: DataType of the number, default to ``DataTypes.BIGINT()``.
+        """
+        self._j_schema_builder.addNumberColumn(name)
+        self._fields.append(DataTypes.FIELD(name, number_type))
+        return self
+
+    def add_string_column(self, name: str) -> 'CsvSchemaBuilder':
+        """
+        Add a string column to schema, with type as ``DataTypes.STRING()``.
+
+        :param name: Name of the column.
+        """
+        self._j_schema_builder.addColumn(name)
+        self._fields.append(DataTypes.FIELD(name, DataTypes.STRING()))
+        return self
+
+    def add_columns_from(self, schema: 'CsvSchema') -> 'CsvSchemaBuilder':
+        """
+        Add all columns in ``schema`` to current schema.
+
+        :param schema: Another :class:`CsvSchema`.
+        """
+        self._j_schema_builder.addColumnsFrom(schema._j_schema)
+        for field in cast(schema._data_type, RowType):
+            self._fields.append(field)
+        return self
+
+    def clear_columns(self):
+        """
+        Delete all columns in the schema.
+        """
+        self._j_schema_builder.clearColumns()
+        self._fields.clear()
+        return self
+
+    def set_allow_comments(self, allow: bool = True):
+        """
+        Allow using ``#`` prefixed comments in csv file.
+        """
+        self._j_schema_builder.setAllowComments(allow)
+        return self
+
+    def set_any_property_name(self, name: str):
+        self._j_schema_builder.setAnyPropertyName(name)
+        return self
+
+    def disable_array_element_separator(self):
+        """
+        Set array element separator to ``""``.
+        """
+        self._j_schema_builder.disableArrayElementSeparator()
+        return self
+
+    def remove_array_element_separator(self, index: int):
+        """
+        Set array element separator of a column specified by ``index`` to ``""``.
+        """
+        self._j_schema_builder.removeArrayElementSeparator(index)
+        return self
+
+    def set_array_element_separator(self, separator: str):
+        """
+        Set global array element separator, default to ``;``.
+        """
+        self._j_schema_builder.setArrayElementSeparator(separator)
+        return self
+
+    def set_column_separator(self, char: str):
+        """
+        Set column separator, ``char`` should be a single char, default to ``,``.
+        """
+        if len(char) != 1:
+            raise ValueError('Column separator must be a single char, got {}'.format(char))
+        self._j_schema_builder.setColumnSeparator(char)
+        return self
+
+    def disable_escape_char(self):
+        """
+        Disable escaping in csv file.
+        """
+        self._j_schema_builder.disableEscapeChar()
+        return self
+
+    def set_escape_char(self, char: str):
+        """
+        Set escape char, ``char`` should be a single char, default to no-escaping.
+        """
+        if len(char) != 1:
+            raise ValueError('Escape char must be a single char, got {}'.format(char))
+        self._j_schema_builder.setEscapeChar(char)
+        return self
+
+    def set_line_separator(self, separator: str):
+        """
+        Set line separator, default to ``\\n``. This is only configurable for writing, for reading,
+        ``\\n``, ``\\r``, ``\\r\\n`` are recognized.
+        """
+        self._j_schema_builder.setLineSeparator(separator)
+        return self
+
+    def set_null_value(self, null_value: str):
+        """
+        Set literal for null value, default to empty sequence.
+        """
+        self._j_schema_builder.setNullValue(null_value)
+
+    def disable_quote_char(self):
+        """
+        Disable quote char.
+        """
+        self._j_schema_builder.disableQuoteChar()
+        return self
+
+    def set_quote_char(self, char: str):
+        """
+        Set quote char, default to ``"``.
+        """
+        if len(char) != 1:
+            raise ValueError('Quote char must be a single char, got {}'.format(char))
+        self._j_schema_builder.setQuoteChar(char)
+        return self
+
+    def set_skip_first_data_row(self, skip: bool = True):
+        """
+        Set whether to skip the first row of csv file.
+        """
+        self._j_schema_builder.setSkipFirstDataRow(skip)
+        return self
+
+    def set_strict_headers(self, strict: bool = True):
+        """
+        Set whether to use strict headers, which check column names in the header are consistent
+        with the schema.
+        """
+        self._j_schema_builder.setStrictHeaders(strict)
+        return self
+
+    def set_use_header(self, use: bool = True):
+        """
+        Set whether to read header.
+        """
+        self._j_schema_builder.setUseHeader(use)
+        return self
+
+    def size(self):
+        return len(self._fields)
+
+    def __len__(self):
+        return self.size()
+
+
+class CsvReaderFormat(StreamFormat):
+    """
+    The :class:`StreamFormat` for reading csv files.
+
+    Example:
+    ::
+
+        >>> schema = CsvSchema.builder() \\
+        ...     .add_number_column('id', number_type=DataTypes.INT()) \\
+        ...     .add_string_column('name') \\
+        ...     .add_array_column('list', ',', element_type=DataTypes.STRING()) \\
+        ...     .set_column_separator('|') \\
+        ...     .set_escape_char('\\\\') \\
+        ...     .set_use_header() \\
+        ...     .set_strict_headers() \\
+        ...     .build()
+        >>> source = FileSource.for_record_stream_format(
+        ...     CsvReaderFormat.for_schema(schema), CSV_FILE_PATH).build()
+        >>> ds = env.from_source(source, WatermarkStrategy.no_watermarks(), 'csv-source')
+        >>> # the type of records is Types.ROW_NAMED(['id', 'name', 'list'],
+        >>> #   [Types.INT(), Types.STRING(), Types.LIST(Types.STRING())])
+
+    .. versionadded:: 1.16.0
+    """
+
+    def __init__(self, j_csv_format):
+        super().__init__(j_csv_format)
+
+    @staticmethod
+    def for_schema(schema: 'CsvSchema') -> 'CsvReaderFormat':
+        """
+        Builds a :class:`CsvReaderFormat` using `CsvSchema`.
+        """
+        jvm = get_gateway().jvm
+        jackson = jvm.org.apache.flink.shaded.jackson2.com.fasterxml.jackson
+        constructor = get_java_class(jvm.org.apache.flink.formats.csv.CsvReaderFormat) \
+            .getDeclaredConstructor(
+            to_jarray(jvm.Class, [
+                get_java_class(jackson.dataformat.csv.CsvMapper),
+                get_java_class(jackson.dataformat.csv.CsvSchema),
+                get_java_class(jvm.Class),
+                get_java_class(jvm.org.apache.flink.formats.common.Converter),
+                get_java_class(jvm.org.apache.flink.api.common.typeinfo.TypeInformation),
+                get_java_class(jvm.boolean)
+            ])
+        )
+        constructor.setAccessible(True)
+        j_csv_format = constructor.newInstance(
+            to_jarray(jvm.Object, [
+                jackson.dataformat.csv.CsvMapper(),
+                schema._j_schema,
+                get_java_class(jackson.databind.JsonNode),
+                jvm.org.apache.flink.formats.csv.CsvToRowDataConverters(False).createRowConverter(
+                    _to_java_data_type(schema._data_type).getLogicalType(), True),
+                jvm.org.apache.flink.table.runtime.typeutils.InternalTypeInfo.of(
+                    _to_java_data_type(schema._data_type).getLogicalType()),
+                False
+            ])
+        )
+        return CsvReaderFormat(j_csv_format)
diff --git a/flink-python/pyflink/datastream/formats/parquet.py b/flink-python/pyflink/datastream/formats/parquet.py
index acc1739bede..dea57894f1e 100644
--- a/flink-python/pyflink/datastream/formats/parquet.py
+++ b/flink-python/pyflink/datastream/formats/parquet.py
@@ -24,8 +24,8 @@ from pyflink.table.types import RowType, _to_java_data_type
 
 class AvroParquetReaders(object):
     """
-    A convenience builder to create AvroParquetRecordFormat instances for the different kinds of
-    Avro record types. Only GenericRecord is supported in PyFlink.
+    A convenience builder to create reader format that reads individual Avro records from a
+    Parquet stream. Only GenericRecord is supported in PyFlink.
 
     .. versionadded:: 1.16.0
     """
diff --git a/flink-python/pyflink/pyflink_gateway_server.py b/flink-python/pyflink/pyflink_gateway_server.py
index 2ee5307aa07..eabaa25566d 100644
--- a/flink-python/pyflink/pyflink_gateway_server.py
+++ b/flink-python/pyflink/pyflink_gateway_server.py
@@ -215,7 +215,6 @@ def construct_test_classpath():
     test_jar_patterns = [
         "flink-runtime/target/flink-runtime*tests.jar",
         "flink-streaming-java/target/flink-streaming-java*tests.jar",
-        "flink-formats/flink-csv/target/flink-csv*.jar",
         "flink-formats/flink-sql-avro/target/flink-sql-avro*.jar",
         "flink-formats/flink-sql-parquet/target/flink-sql-parquet*.jar",
         "flink-formats/flink-json/target/flink-json*.jar",