You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/07/18 01:49:25 UTC
[flink] branch master updated: [FLINK-28464][python][csv] Support CsvReaderFormat
This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new cef3aa136ed [FLINK-28464][python][csv] Support CsvReaderFormat
cef3aa136ed is described below
commit cef3aa136edb555b229950fb09067e042dd4361f
Author: Juntao Hu <ma...@gmail.com>
AuthorDate: Fri Jul 8 11:33:37 2022 +0800
[FLINK-28464][python][csv] Support CsvReaderFormat
This closes #20220.
---
.../docs/connectors/datastream/formats/csv.md | 18 ++
.../docs/connectors/datastream/formats/csv.md | 18 ++
flink-python/pyflink/datastream/__init__.py | 13 +
.../connectors/tests/test_file_system.py | 152 +++++++++-
.../pyflink/datastream/formats/__init__.py | 5 +
flink-python/pyflink/datastream/formats/base.py | 3 +
flink-python/pyflink/datastream/formats/csv.py | 323 +++++++++++++++++++++
flink-python/pyflink/datastream/formats/parquet.py | 4 +-
flink-python/pyflink/pyflink_gateway_server.py | 1 -
9 files changed, 533 insertions(+), 4 deletions(-)
diff --git a/docs/content.zh/docs/connectors/datastream/formats/csv.md b/docs/content.zh/docs/connectors/datastream/formats/csv.md
index 96cd5143e3f..870dde3488d 100644
--- a/docs/content.zh/docs/connectors/datastream/formats/csv.md
+++ b/docs/content.zh/docs/connectors/datastream/formats/csv.md
@@ -38,6 +38,8 @@ To use the CSV format you need to add the Flink CSV dependency to your project:
</dependency>
```
+For PyFlink users, you could use it directly in your jobs.
+
Flink supports reading CSV files using `CsvReaderFormat`. The reader utilizes Jackson library and allows passing the corresponding configuration for the CSV schema and parsing options.
`CsvReaderFormat` can be initialized and used like this:
@@ -113,6 +115,22 @@ CsvReaderFormat<ComplexPojo> csvFormat =
.build(),
TypeInformation.of(ComplexPojo.class));
```
+
+For PyFlink users, a csv schema can be defined by manually adding columns, and the output type of the csv source will be a Row with each column mapped to a field.
+```python
+schema = CsvSchema.builder() \
+ .add_number_column('id', number_type=DataTypes.BIGINT()) \
+ .add_array_column('array', separator='#', element_type=DataTypes.INT()) \
+ .set_column_separator(',') \
+ .build()
+
+source = FileSource.for_record_stream_format(
+ CsvReaderFormat.for_schema(schema), CSV_FILE_PATH).build()
+
+# the type of record will be Types.ROW_NAMED(['id', 'array'], [Types.LONG(), Types.LIST(Types.INT())])
+ds = env.from_source(source, WatermarkStrategy.no_watermarks(), 'csv-source')
+```
+
The corresponding CSV file:
```
0,1#2#3
diff --git a/docs/content/docs/connectors/datastream/formats/csv.md b/docs/content/docs/connectors/datastream/formats/csv.md
index 96cd5143e3f..870dde3488d 100644
--- a/docs/content/docs/connectors/datastream/formats/csv.md
+++ b/docs/content/docs/connectors/datastream/formats/csv.md
@@ -38,6 +38,8 @@ To use the CSV format you need to add the Flink CSV dependency to your project:
</dependency>
```
+For PyFlink users, you could use it directly in your jobs.
+
Flink supports reading CSV files using `CsvReaderFormat`. The reader utilizes Jackson library and allows passing the corresponding configuration for the CSV schema and parsing options.
`CsvReaderFormat` can be initialized and used like this:
@@ -113,6 +115,22 @@ CsvReaderFormat<ComplexPojo> csvFormat =
.build(),
TypeInformation.of(ComplexPojo.class));
```
+
+For PyFlink users, a csv schema can be defined by manually adding columns, and the output type of the csv source will be a Row with each column mapped to a field.
+```python
+schema = CsvSchema.builder() \
+ .add_number_column('id', number_type=DataTypes.BIGINT()) \
+ .add_array_column('array', separator='#', element_type=DataTypes.INT()) \
+ .set_column_separator(',') \
+ .build()
+
+source = FileSource.for_record_stream_format(
+ CsvReaderFormat.for_schema(schema), CSV_FILE_PATH).build()
+
+# the type of record will be Types.ROW_NAMED(['id', 'array'], [Types.LONG(), Types.LIST(Types.INT())])
+ds = env.from_source(source, WatermarkStrategy.no_watermarks(), 'csv-source')
+```
+
The corresponding CSV file:
```
0,1#2#3
diff --git a/flink-python/pyflink/datastream/__init__.py b/flink-python/pyflink/datastream/__init__.py
index be54f5bc301..2c4a9813d74 100644
--- a/flink-python/pyflink/datastream/__init__.py
+++ b/flink-python/pyflink/datastream/__init__.py
@@ -190,6 +190,19 @@ Classes to define source & sink:
- :class:`connectors.RMQSink`:
A Sink for publishing data into RabbitMQ.
+Classes to define formats used together with source & sink:
+
+ - :class:`formats.CsvReaderFormat`:
+ A :class:`connectors.StreamFormat` to read csv files into Row data.
+ - :class:`formats.AvroInputFormat`:
+ An :class:`formats.InputFormat` to read avro files.
+ - :class:`formats.ParquetColumnarRowInputFormat`:
+ A :class:`connectors.BulkFormat` to read columnar parquet files into Row data in a
+ batch-processing fashion.
+ - :class:`formats.AvroParquetReaders`:
+ A convenience builder to create reader format that reads individual Avro records from a
+ Parquet stream. Only GenericRecord is supported in PyFlink.
+
Other important classes:
- :class:`TimeCharacteristic`:
diff --git a/flink-python/pyflink/datastream/connectors/tests/test_file_system.py b/flink-python/pyflink/datastream/connectors/tests/test_file_system.py
index d2ce2a47495..501d4041363 100644
--- a/flink-python/pyflink/datastream/connectors/tests/test_file_system.py
+++ b/flink-python/pyflink/datastream/connectors/tests/test_file_system.py
@@ -22,8 +22,9 @@ from typing import Tuple, List
from py4j.java_gateway import java_import, JavaObject
-from pyflink.common import Configuration
+from pyflink.common import Types, Configuration
from pyflink.common.watermark_strategy import WatermarkStrategy
+from pyflink.datastream.formats.csv import CsvSchema, CsvReaderFormat
from pyflink.datastream.functions import MapFunction
from pyflink.datastream.connectors.file_system import FileSource
from pyflink.datastream.formats.avro import AvroSchema, AvroInputFormat
@@ -34,6 +35,155 @@ from pyflink.table.types import RowType, DataTypes
from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase
+class FileSourceCsvReaderFormatTests(PyFlinkStreamingTestCase):
+
+ def setUp(self):
+ super().setUp()
+ self.test_sink = DataStreamTestSinkFunction()
+ self.csv_file_name = tempfile.mktemp(suffix='.csv', dir=self.tempdir)
+
+ def test_csv_primitive_column(self):
+ schema = CsvSchema.builder() \
+ .add_number_column('tinyint', DataTypes.TINYINT()) \
+ .add_number_column('smallint', DataTypes.SMALLINT()) \
+ .add_number_column('int', DataTypes.INT()) \
+ .add_number_column('bigint', DataTypes.BIGINT()) \
+ .add_number_column('float', DataTypes.FLOAT()) \
+ .add_number_column('double', DataTypes.DOUBLE()) \
+ .add_number_column('decimal', DataTypes.DECIMAL(2, 0)) \
+ .add_boolean_column('boolean') \
+ .add_string_column('string') \
+ .build()
+ with open(self.csv_file_name, 'w') as f:
+ f.write('127,')
+ f.write('-32767,')
+ f.write('2147483647,')
+ f.write('-9223372036854775808,')
+ f.write('3e38,')
+ f.write('2e-308,')
+ f.write('1.5,')
+ f.write('true,')
+ f.write('string\n')
+ self._build_csv_job(schema)
+ self.env.execute('test_csv_primitive_column')
+ row = self.test_sink.get_results(True, False)[0]
+ self.assertEqual(row['tinyint'], 127)
+ self.assertEqual(row['smallint'], -32767)
+ self.assertEqual(row['int'], 2147483647)
+ self.assertEqual(row['bigint'], -9223372036854775808)
+ self.assertAlmostEqual(row['float'], 3e38, delta=1e31)
+ self.assertAlmostEqual(row['double'], 2e-308, delta=2e-301)
+ self.assertAlmostEqual(row['decimal'], 2)
+ self.assertEqual(row['boolean'], True)
+ self.assertEqual(row['string'], 'string')
+
+ def test_csv_array_column(self):
+ schema = CsvSchema.builder() \
+ .add_array_column('number_array', separator=';', element_type=DataTypes.INT()) \
+ .add_array_column('boolean_array', separator=':', element_type=DataTypes.BOOLEAN()) \
+ .add_array_column('string_array', separator=',', element_type=DataTypes.STRING()) \
+ .set_column_separator('|') \
+ .build()
+ with open(self.csv_file_name, 'w') as f:
+ f.write('1;2;3|')
+ f.write('true:false|')
+ f.write('a,b,c\n')
+ self._build_csv_job(schema)
+ self.env.execute('test_csv_array_column')
+ row = self.test_sink.get_results(True, False)[0]
+ self.assertListEqual(row['number_array'], [1, 2, 3])
+ self.assertListEqual(row['boolean_array'], [True, False])
+ self.assertListEqual(row['string_array'], ['a', 'b', 'c'])
+
+ def test_csv_allow_comments(self):
+ schema = CsvSchema.builder() \
+ .add_string_column('string') \
+ .set_allow_comments() \
+ .build()
+ with open(self.csv_file_name, 'w') as f:
+ f.write('a\n')
+ f.write('# this is comment\n')
+ f.write('b\n')
+ self._build_csv_job(schema)
+ self.env.execute('test_csv_allow_comments')
+ rows = self.test_sink.get_results(True, False)
+ self.assertEqual(rows[0]['string'], 'a')
+ self.assertEqual(rows[1]['string'], 'b')
+
+ def test_csv_use_header(self):
+ schema = CsvSchema.builder() \
+ .add_string_column('string') \
+ .add_number_column('number') \
+ .set_use_header() \
+ .build()
+ with open(self.csv_file_name, 'w') as f:
+ f.write('h1,h2\n')
+ f.write('string,123\n')
+ self._build_csv_job(schema)
+ self.env.execute('test_csv_use_header')
+ row = self.test_sink.get_results(True, False)[0]
+ self.assertEqual(row['string'], 'string')
+ self.assertEqual(row['number'], 123)
+
+ def test_csv_strict_headers(self):
+ schema = CsvSchema.builder() \
+ .add_string_column('string') \
+ .add_number_column('number') \
+ .set_use_header() \
+ .set_strict_headers() \
+ .build()
+ with open(self.csv_file_name, 'w') as f:
+ f.write('string,number\n')
+ f.write('string,123\n')
+ self._build_csv_job(schema)
+ self.env.execute('test_csv_strict_headers')
+ row = self.test_sink.get_results(True, False)[0]
+ self.assertEqual(row['string'], 'string')
+ self.assertEqual(row['number'], 123)
+
+ def test_csv_default_quote_char(self):
+ schema = CsvSchema.builder() \
+ .add_string_column('string') \
+ .build()
+ with open(self.csv_file_name, 'w') as f:
+ f.write('"string"\n')
+ self._build_csv_job(schema)
+ self.env.execute('test_csv_default_quote_char')
+ row = self.test_sink.get_results(True, False)[0]
+ self.assertEqual(row['string'], 'string')
+
+ def test_csv_customize_quote_char(self):
+ schema = CsvSchema.builder() \
+ .add_string_column('string') \
+ .set_quote_char('`') \
+ .build()
+ with open(self.csv_file_name, 'w') as f:
+ f.write('`string`\n')
+ self._build_csv_job(schema)
+ self.env.execute('test_csv_customize_quote_char')
+ row = self.test_sink.get_results(True, False)[0]
+ self.assertEqual(row['string'], 'string')
+
+ def test_csv_use_escape_char(self):
+ schema = CsvSchema.builder() \
+ .add_string_column('string') \
+ .set_escape_char('\\') \
+ .build()
+ with open(self.csv_file_name, 'w') as f:
+ f.write('\\"string\\"\n')
+ self._build_csv_job(schema)
+ self.env.execute('test_csv_use_escape_char')
+ row = self.test_sink.get_results(True, False)[0]
+ self.assertEqual(row['string'], '"string"')
+
+ def _build_csv_job(self, schema):
+ source = FileSource.for_record_stream_format(
+ CsvReaderFormat.for_schema(schema), self.csv_file_name).build()
+ ds = self.env.from_source(source, WatermarkStrategy.no_watermarks(), 'csv-source')
+ ds.map(PassThroughMapFunction(), output_type=Types.PICKLED_BYTE_ARRAY()) \
+ .add_sink(self.test_sink)
+
+
@unittest.skipIf(os.environ.get('HADOOP_CLASSPATH') is None,
'Some Hadoop lib is needed for Parquet Columnar format tests')
class FileSourceParquetColumnarFormatTests(PyFlinkStreamingTestCase):
diff --git a/flink-python/pyflink/datastream/formats/__init__.py b/flink-python/pyflink/datastream/formats/__init__.py
index aafb2d99dd8..37ad31a4ab7 100644
--- a/flink-python/pyflink/datastream/formats/__init__.py
+++ b/flink-python/pyflink/datastream/formats/__init__.py
@@ -16,11 +16,16 @@
# limitations under the License.
################################################################################
from .avro import AvroSchema, AvroInputFormat
+from .base import InputFormat
+from .csv import CsvSchema, CsvReaderFormat
from .parquet import AvroParquetReaders, ParquetColumnarRowInputFormat
__all__ = [
+ 'InputFormat',
'AvroInputFormat',
'AvroParquetReaders',
'AvroSchema',
+ 'CsvReaderFormat',
+ 'CsvSchema',
'ParquetColumnarRowInputFormat'
]
diff --git a/flink-python/pyflink/datastream/formats/base.py b/flink-python/pyflink/datastream/formats/base.py
index f2b17bcba39..ac0500d0d62 100644
--- a/flink-python/pyflink/datastream/formats/base.py
+++ b/flink-python/pyflink/datastream/formats/base.py
@@ -22,6 +22,9 @@ from py4j.java_gateway import JavaObject
from pyflink.datastream.functions import JavaFunctionWrapper
+__all__ = ['InputFormat']
+
+
class InputFormat(JavaFunctionWrapper):
"""
The Python wrapper of Java InputFormat interface, which is the base interface for data sources
diff --git a/flink-python/pyflink/datastream/formats/csv.py b/flink-python/pyflink/datastream/formats/csv.py
new file mode 100644
index 00000000000..82f405da191
--- /dev/null
+++ b/flink-python/pyflink/datastream/formats/csv.py
@@ -0,0 +1,323 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Optional, cast
+
+from py4j.java_gateway import get_java_class
+from pyflink.datastream.connectors import StreamFormat
+from pyflink.java_gateway import get_gateway
+from pyflink.table.types import DataType, DataTypes, _to_java_data_type, RowType, NumericType
+from pyflink.util.java_utils import to_jarray
+
+
+class CsvSchema(object):
+ """
+ CsvSchema holds schema information of a csv file, corresponding to Java
+ ``com.fasterxml.jackson.dataformat.csv.CsvSchema`` class.
+
+ .. versionadded:: 1.16.0
+ """
+
+ def __init__(self, j_schema, data_type: DataType):
+ self._j_schema = j_schema
+ self._data_type = data_type
+
+ @staticmethod
+ def builder() -> 'CsvSchemaBuilder':
+ """
+ Returns a :class:`CsvSchemaBuilder`.
+ """
+ return CsvSchemaBuilder()
+
+ def size(self):
+ return self._j_schema.size()
+
+ def __len__(self):
+ return self.size()
+
+ def __str__(self):
+ return self._j_schema.toString()
+
+ def __repr__(self):
+ return str(self)
+
+
+class CsvSchemaBuilder(object):
+ """
+ CsvSchemaBuilder is for building a :class:`CsvSchemaBuilder`, corresponding to Java
+ ``com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder`` class.
+
+ .. versionadded:: 1.16.0
+ """
+
+ def __init__(self):
+ jvm = get_gateway().jvm
+ self._j_schema_builder = jvm.org.apache.flink.shaded.jackson2.com.fasterxml.jackson \
+ .dataformat.csv.CsvSchema.builder()
+ self._fields = []
+
+ def build(self) -> 'CsvSchema':
+ """
+ Build the :class:`CsvSchema`.
+ """
+ return CsvSchema(self._j_schema_builder.build(), DataTypes.ROW(self._fields))
+
+ def add_array_column(self,
+ name: str,
+ separator: str = ';',
+ element_type: Optional[DataType] = DataTypes.STRING()) \
+ -> 'CsvSchemaBuilder':
+ """
+ Add an array column to schema, the type of elements could be specified via ``element_type``,
+ which should be primitive types.
+
+ :param name: Name of the column.
+ :param separator: Text separator of array elements, default to ``;``.
+ :param element_type: DataType of array elements, default to ``DataTypes.STRING()``.
+ """
+ self._j_schema_builder.addArrayColumn(name, separator)
+ self._fields.append(DataTypes.FIELD(name, DataTypes.ARRAY(element_type)))
+ return self
+
+ def add_boolean_column(self, name: str) -> 'CsvSchemaBuilder':
+ """
+ Add a boolean column to schema, with type as ``DataTypes.BOOLEAN()``.
+
+ :param name: Name of the column.
+ """
+ self._j_schema_builder.addBooleanColumn(name)
+ self._fields.append(DataTypes.FIELD(name, DataTypes.BOOLEAN()))
+ return self
+
+ def add_number_column(self, name: str,
+ number_type: Optional[NumericType] = DataTypes.BIGINT()) \
+ -> 'CsvSchemaBuilder':
+ """
+ Add a number column to schema, the type of number could be specified via ``number_type``.
+
+ :param name: Name of the column.
+ :param number_type: DataType of the number, default to ``DataTypes.BIGINT()``.
+ """
+ self._j_schema_builder.addNumberColumn(name)
+ self._fields.append(DataTypes.FIELD(name, number_type))
+ return self
+
+ def add_string_column(self, name: str) -> 'CsvSchemaBuilder':
+ """
+ Add a string column to schema, with type as ``DataTypes.STRING()``.
+
+ :param name: Name of the column.
+ """
+ self._j_schema_builder.addColumn(name)
+ self._fields.append(DataTypes.FIELD(name, DataTypes.STRING()))
+ return self
+
+ def add_columns_from(self, schema: 'CsvSchema') -> 'CsvSchemaBuilder':
+ """
+ Add all columns in ``schema`` to current schema.
+
+ :param schema: Another :class:`CsvSchema`.
+ """
+ self._j_schema_builder.addColumnsFrom(schema._j_schema)
+ for field in cast(schema._data_type, RowType):
+ self._fields.append(field)
+ return self
+
+ def clear_columns(self):
+ """
+ Delete all columns in the schema.
+ """
+ self._j_schema_builder.clearColumns()
+ self._fields.clear()
+ return self
+
+ def set_allow_comments(self, allow: bool = True):
+ """
+ Allow using ``#`` prefixed comments in csv file.
+ """
+ self._j_schema_builder.setAllowComments(allow)
+ return self
+
+ def set_any_property_name(self, name: str):
+ self._j_schema_builder.setAnyPropertyName(name)
+ return self
+
+ def disable_array_element_separator(self):
+ """
+ Set array element separator to ``""``.
+ """
+ self._j_schema_builder.disableArrayElementSeparator()
+ return self
+
+ def remove_array_element_separator(self, index: int):
+ """
+ Set array element separator of a column specified by ``index`` to ``""``.
+ """
+ self._j_schema_builder.removeArrayElementSeparator(index)
+ return self
+
+ def set_array_element_separator(self, separator: str):
+ """
+ Set global array element separator, default to ``;``.
+ """
+ self._j_schema_builder.setArrayElementSeparator(separator)
+ return self
+
+ def set_column_separator(self, char: str):
+ """
+ Set column separator, ``char`` should be a single char, default to ``,``.
+ """
+ if len(char) != 1:
+ raise ValueError('Column separator must be a single char, got {}'.format(char))
+ self._j_schema_builder.setColumnSeparator(char)
+ return self
+
+ def disable_escape_char(self):
+ """
+ Disable escaping in csv file.
+ """
+ self._j_schema_builder.disableEscapeChar()
+ return self
+
+ def set_escape_char(self, char: str):
+ """
+ Set escape char, ``char`` should be a single char, default to no-escaping.
+ """
+ if len(char) != 1:
+ raise ValueError('Escape char must be a single char, got {}'.format(char))
+ self._j_schema_builder.setEscapeChar(char)
+ return self
+
+ def set_line_separator(self, separator: str):
+ """
+ Set line separator, default to ``\\n``. This is only configurable for writing, for reading,
+ ``\\n``, ``\\r``, ``\\r\\n`` are recognized.
+ """
+ self._j_schema_builder.setLineSeparator(separator)
+ return self
+
+ def set_null_value(self, null_value: str):
+ """
+ Set literal for null value, default to empty sequence.
+ """
+ self._j_schema_builder.setNullValue(null_value)
+
+ def disable_quote_char(self):
+ """
+ Disable quote char.
+ """
+ self._j_schema_builder.disableQuoteChar()
+ return self
+
+ def set_quote_char(self, char: str):
+ """
+ Set quote char, default to ``"``.
+ """
+ if len(char) != 1:
+ raise ValueError('Quote char must be a single char, got {}'.format(char))
+ self._j_schema_builder.setQuoteChar(char)
+ return self
+
+ def set_skip_first_data_row(self, skip: bool = True):
+ """
+ Set whether to skip the first row of csv file.
+ """
+ self._j_schema_builder.setSkipFirstDataRow(skip)
+ return self
+
+ def set_strict_headers(self, strict: bool = True):
+ """
+ Set whether to use strict headers, which check column names in the header are consistent
+ with the schema.
+ """
+ self._j_schema_builder.setStrictHeaders(strict)
+ return self
+
+ def set_use_header(self, use: bool = True):
+ """
+ Set whether to read header.
+ """
+ self._j_schema_builder.setUseHeader(use)
+ return self
+
+ def size(self):
+ return len(self._fields)
+
+ def __len__(self):
+ return self.size()
+
+
+class CsvReaderFormat(StreamFormat):
+ """
+ The :class:`StreamFormat` for reading csv files.
+
+ Example:
+ ::
+
+ >>> schema = CsvSchema.builder() \\
+ ... .add_number_column('id', number_type=DataTypes.INT()) \\
+ ... .add_string_column('name') \\
+ ... .add_array_column('list', ',', element_type=DataTypes.STRING()) \\
+ ... .set_column_separator('|') \\
+ ... .set_escape_char('\\\\') \\
+ ... .set_use_header() \\
+ ... .set_strict_headers() \\
+ ... .build()
+ >>> source = FileSource.for_record_stream_format(
+ ... CsvReaderFormat.for_schema(schema), CSV_FILE_PATH).build()
+ >>> ds = env.from_source(source, WatermarkStrategy.no_watermarks(), 'csv-source')
+ >>> # the type of records is Types.ROW_NAMED(['id', 'name', 'list'],
+ >>> # [Types.INT(), Types.STRING(), Types.LIST(Types.STRING())])
+
+ .. versionadded:: 1.16.0
+ """
+
+ def __init__(self, j_csv_format):
+ super().__init__(j_csv_format)
+
+ @staticmethod
+ def for_schema(schema: 'CsvSchema') -> 'CsvReaderFormat':
+ """
+ Builds a :class:`CsvReaderFormat` using `CsvSchema`.
+ """
+ jvm = get_gateway().jvm
+ jackson = jvm.org.apache.flink.shaded.jackson2.com.fasterxml.jackson
+ constructor = get_java_class(jvm.org.apache.flink.formats.csv.CsvReaderFormat) \
+ .getDeclaredConstructor(
+ to_jarray(jvm.Class, [
+ get_java_class(jackson.dataformat.csv.CsvMapper),
+ get_java_class(jackson.dataformat.csv.CsvSchema),
+ get_java_class(jvm.Class),
+ get_java_class(jvm.org.apache.flink.formats.common.Converter),
+ get_java_class(jvm.org.apache.flink.api.common.typeinfo.TypeInformation),
+ get_java_class(jvm.boolean)
+ ])
+ )
+ constructor.setAccessible(True)
+ j_csv_format = constructor.newInstance(
+ to_jarray(jvm.Object, [
+ jackson.dataformat.csv.CsvMapper(),
+ schema._j_schema,
+ get_java_class(jackson.databind.JsonNode),
+ jvm.org.apache.flink.formats.csv.CsvToRowDataConverters(False).createRowConverter(
+ _to_java_data_type(schema._data_type).getLogicalType(), True),
+ jvm.org.apache.flink.table.runtime.typeutils.InternalTypeInfo.of(
+ _to_java_data_type(schema._data_type).getLogicalType()),
+ False
+ ])
+ )
+ return CsvReaderFormat(j_csv_format)
diff --git a/flink-python/pyflink/datastream/formats/parquet.py b/flink-python/pyflink/datastream/formats/parquet.py
index acc1739bede..dea57894f1e 100644
--- a/flink-python/pyflink/datastream/formats/parquet.py
+++ b/flink-python/pyflink/datastream/formats/parquet.py
@@ -24,8 +24,8 @@ from pyflink.table.types import RowType, _to_java_data_type
class AvroParquetReaders(object):
"""
- A convenience builder to create AvroParquetRecordFormat instances for the different kinds of
- Avro record types. Only GenericRecord is supported in PyFlink.
+ A convenience builder to create reader format that reads individual Avro records from a
+ Parquet stream. Only GenericRecord is supported in PyFlink.
.. versionadded:: 1.16.0
"""
diff --git a/flink-python/pyflink/pyflink_gateway_server.py b/flink-python/pyflink/pyflink_gateway_server.py
index 2ee5307aa07..eabaa25566d 100644
--- a/flink-python/pyflink/pyflink_gateway_server.py
+++ b/flink-python/pyflink/pyflink_gateway_server.py
@@ -215,7 +215,6 @@ def construct_test_classpath():
test_jar_patterns = [
"flink-runtime/target/flink-runtime*tests.jar",
"flink-streaming-java/target/flink-streaming-java*tests.jar",
- "flink-formats/flink-csv/target/flink-csv*.jar",
"flink-formats/flink-sql-avro/target/flink-sql-avro*.jar",
"flink-formats/flink-sql-parquet/target/flink-sql-parquet*.jar",
"flink-formats/flink-json/target/flink-json*.jar",