You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/07/26 10:51:23 UTC
[flink] branch master updated: [FLINK-28624][csv] Accept mapper/schema factories
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1dee35a9966 [FLINK-28624][csv] Accept mapper/schema factories
1dee35a9966 is described below
commit 1dee35a9966065593aa5dfc618f5fec17fb80890
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Jul 26 12:51:16 2022 +0200
[FLINK-28624][csv] Accept mapper/schema factories
---
.../docs/connectors/datastream/filesystem.md | 4 +-
.../docs/connectors/datastream/formats/csv.md | 12 +--
.../docs/connectors/datastream/filesystem.md | 4 +-
.../docs/connectors/datastream/formats/csv.md | 12 +--
.../flink/formats/csv/CsvFileFormatFactory.java | 4 +-
.../apache/flink/formats/csv/CsvReaderFormat.java | 75 +++++++++-----
.../flink/formats/csv/CsvReaderFormatTest.java | 115 +++++++++++++++++++++
.../flink/formats/csv/DataStreamCsvITCase.java | 12 ++-
flink-python/pom.xml | 6 ++
flink-python/pyflink/datastream/formats/csv.py | 30 +-----
.../flink/formats/csv/CsvReaderFormatFactory.java | 45 ++++++++
11 files changed, 245 insertions(+), 74 deletions(-)
diff --git a/docs/content.zh/docs/connectors/datastream/filesystem.md b/docs/content.zh/docs/connectors/datastream/filesystem.md
index 3c4b0cea567..cf8e6e1dae2 100644
--- a/docs/content.zh/docs/connectors/datastream/filesystem.md
+++ b/docs/content.zh/docs/connectors/datastream/filesystem.md
@@ -161,8 +161,8 @@ FileSource<SomePojo> source =
如果需要对 CSV 模式或解析选项进行更细粒度的控制,可以使用 `CsvReaderFormat` 的更底层的 `forSchema` 静态工厂方法:
```java
-CsvReaderFormat<T> forSchema(CsvMapper mapper,
- CsvSchema schema,
+CsvReaderFormat<T> forSchema(Supplier<CsvMapper> mapperFactory,
+ Function<CsvMapper, CsvSchema> schemaGenerator,
TypeInformation<T> typeInformation)
```
diff --git a/docs/content.zh/docs/connectors/datastream/formats/csv.md b/docs/content.zh/docs/connectors/datastream/formats/csv.md
index 870dde3488d..dbf8bd6be6c 100644
--- a/docs/content.zh/docs/connectors/datastream/formats/csv.md
+++ b/docs/content.zh/docs/connectors/datastream/formats/csv.md
@@ -60,8 +60,8 @@ Note: you might need to add `@JsonPropertyOrder({field1, field2, ...})` annotati
If you need more fine-grained control over the CSV schema or the parsing options, use the more low-level `forSchema` static factory method of `CsvReaderFormat`:
```java
-CsvReaderFormat<T> forSchema(CsvMapper mapper,
- CsvSchema schema,
+CsvReaderFormat<T> forSchema(Supplier<CsvMapper> mapperFactory,
+ Function<CsvMapper, CsvSchema> schemaGenerator,
TypeInformation<T> typeInformation)
```
Below is an example of reading a POJO with a custom columns' separator:
@@ -80,15 +80,14 @@ Below is an example of reading a POJO with a custom columns' separator:
public long population;
}
-CsvMapper mapper = new CsvMapper();
-CsvSchema schema =
+Function<CsvMapper, CsvSchema> schemaGenerator = mapper ->
mapper.schemaFor(CityPojo.class).withoutQuoteChar().withColumnSeparator('|');
CsvReaderFormat<CityPojo> csvFormat =
- CsvReaderFormat.forSchema(mapper, schema, TypeInformation.of(CityPojo.class));
+ CsvReaderFormat.forSchema(() -> new CsvMapper(), schemaGenerator, TypeInformation.of(CityPojo.class));
FileSource<CityPojo> source =
- FileSource.forRecordStreamFormat(csvFormat,Path.fromLocalFile(...)).build();
+ FileSource.forRecordStreamFormat(csvFormat, Path.fromLocalFile(...)).build();
```
The corresponding CSV file:
```
@@ -105,7 +104,6 @@ public static class ComplexPojo {
CsvReaderFormat<ComplexPojo> csvFormat =
CsvReaderFormat.forSchema(
- new CsvMapper(),
CsvSchema.builder()
.addColumn(
new CsvSchema.Column(0, "id", CsvSchema.ColumnType.NUMBER))
diff --git a/docs/content/docs/connectors/datastream/filesystem.md b/docs/content/docs/connectors/datastream/filesystem.md
index f9f0f7ae05b..ce2a6c50824 100644
--- a/docs/content/docs/connectors/datastream/filesystem.md
+++ b/docs/content/docs/connectors/datastream/filesystem.md
@@ -157,8 +157,8 @@ The schema for CSV parsing, in this case, is automatically derived based on the
If you need more fine-grained control over the CSV schema or the parsing options, use the more low-level `forSchema` static factory method of `CsvReaderFormat`:
```java
-CsvReaderFormat<T> forSchema(CsvMapper mapper,
- CsvSchema schema,
+CsvReaderFormat<T> forSchema(Supplier<CsvMapper> mapperFactory,
+ Function<CsvMapper, CsvSchema> schemaGenerator,
TypeInformation<T> typeInformation)
```
diff --git a/docs/content/docs/connectors/datastream/formats/csv.md b/docs/content/docs/connectors/datastream/formats/csv.md
index 870dde3488d..dbf8bd6be6c 100644
--- a/docs/content/docs/connectors/datastream/formats/csv.md
+++ b/docs/content/docs/connectors/datastream/formats/csv.md
@@ -60,8 +60,8 @@ Note: you might need to add `@JsonPropertyOrder({field1, field2, ...})` annotati
If you need more fine-grained control over the CSV schema or the parsing options, use the more low-level `forSchema` static factory method of `CsvReaderFormat`:
```java
-CsvReaderFormat<T> forSchema(CsvMapper mapper,
- CsvSchema schema,
+CsvReaderFormat<T> forSchema(Supplier<CsvMapper> mapperFactory,
+ Function<CsvMapper, CsvSchema> schemaGenerator,
TypeInformation<T> typeInformation)
```
Below is an example of reading a POJO with a custom columns' separator:
@@ -80,15 +80,14 @@ Below is an example of reading a POJO with a custom columns' separator:
public long population;
}
-CsvMapper mapper = new CsvMapper();
-CsvSchema schema =
+Function<CsvMapper, CsvSchema> schemaGenerator = mapper ->
mapper.schemaFor(CityPojo.class).withoutQuoteChar().withColumnSeparator('|');
CsvReaderFormat<CityPojo> csvFormat =
- CsvReaderFormat.forSchema(mapper, schema, TypeInformation.of(CityPojo.class));
+ CsvReaderFormat.forSchema(() -> new CsvMapper(), schemaGenerator, TypeInformation.of(CityPojo.class));
FileSource<CityPojo> source =
- FileSource.forRecordStreamFormat(csvFormat,Path.fromLocalFile(...)).build();
+ FileSource.forRecordStreamFormat(csvFormat, Path.fromLocalFile(...)).build();
```
The corresponding CSV file:
```
@@ -105,7 +104,6 @@ public static class ComplexPojo {
CsvReaderFormat<ComplexPojo> csvFormat =
CsvReaderFormat.forSchema(
- new CsvMapper(),
CsvSchema.builder()
.addColumn(
new CsvSchema.Column(0, "id", CsvSchema.ColumnType.NUMBER))
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
index e59ec63794f..2054c182a01 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
@@ -138,8 +138,8 @@ public class CsvFileFormatFactory implements BulkReaderFormatFactory, BulkWriter
.createRowConverter(projectedRowType, true);
CsvReaderFormat<RowData> csvReaderFormat =
new CsvReaderFormat<>(
- new CsvMapper(),
- schema,
+ () -> new CsvMapper(),
+ ignored -> schema,
JsonNode.class,
converter,
context.createTypeInformation(projectedDataType),
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java
index 1ce00fd4d70..c2d14008cb9 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java
@@ -25,6 +25,8 @@ import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.formats.common.Converter;
+import org.apache.flink.util.function.SerializableFunction;
+import org.apache.flink.util.function.SerializableSupplier;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
@@ -33,6 +35,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.Csv
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.function.Function;
+import java.util.function.Supplier;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -56,11 +60,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* low-level {@code forSchema} static factory method based on the {@code Jackson} library utilities:
*
* <pre>{@code
- * CsvMapper mapper = new CsvMapper();
- * CsvSchema schema = mapper.schemaFor(SomePojo.class)
+ * Function<CsvMapper, CsvSchema> schemaGenerator =
+ * mapper -> mapper.schemaFor(SomePojo.class)
* .withColumnSeparator('|');
* CsvReaderFormat<SomePojo> csvFormat =
- * CsvReaderFormat.forSchema(mapper,schema, TypeInformation.of(SomePojo.class));
+ * CsvReaderFormat.forSchema(() -> new CsvMapper(), schemaGenerator, TypeInformation.of(SomePojo.class));
* FileSource<SomePojo> source =
* FileSource.forRecordStreamFormat(csvFormat, Path.fromLocalFile(filesPath)).build();
* }</pre>
@@ -72,8 +76,8 @@ public class CsvReaderFormat<T> extends SimpleStreamFormat<T> {
private static final long serialVersionUID = 1L;
- private final CsvMapper mapper;
- private final CsvSchema schema;
+ private final SerializableSupplier<CsvMapper> mapperFactory;
+ private final SerializableFunction<CsvMapper, CsvSchema> schemaGenerator;
private final Class<Object> rootType;
private final Converter<Object, T, Void> converter;
private final TypeInformation<T> typeInformation;
@@ -81,14 +85,14 @@ public class CsvReaderFormat<T> extends SimpleStreamFormat<T> {
@SuppressWarnings("unchecked")
<R> CsvReaderFormat(
- CsvMapper mapper,
- CsvSchema schema,
+ SerializableSupplier<CsvMapper> mapperFactory,
+ SerializableFunction<CsvMapper, CsvSchema> schemaGenerator,
Class<R> rootType,
Converter<R, T, Void> converter,
TypeInformation<T> typeInformation,
boolean ignoreParseErrors) {
- this.mapper = checkNotNull(mapper);
- this.schema = checkNotNull(schema);
+ this.mapperFactory = checkNotNull(mapperFactory);
+ this.schemaGenerator = checkNotNull(schemaGenerator);
this.rootType = (Class<Object>) checkNotNull(rootType);
this.typeInformation = checkNotNull(typeInformation);
this.converter = (Converter<Object, T, Void>) checkNotNull(converter);
@@ -104,23 +108,45 @@ public class CsvReaderFormat<T> extends SimpleStreamFormat<T> {
*/
public static <T> CsvReaderFormat<T> forSchema(
CsvSchema schema, TypeInformation<T> typeInformation) {
- return forSchema(new CsvMapper(), schema, typeInformation);
+ return forSchema(() -> new CsvMapper(), ignored -> schema, typeInformation);
}
/**
- * Builds a new {@code CsvReaderFormat} using a {@code CsvSchema} and a pre-created {@code
- * CsvMapper}.
+ * @deprecated This method is limited to serializable {@link CsvMapper CsvMappers}, preventing
+ * the usage of certain Jackson modules (like the {@link
+ * org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
+ * Java 8 Date/Time Serializers}). Use {@link #forSchema(Supplier, Function,
+ * TypeInformation)} instead.
+ */
+ @Deprecated
+ public static <T> CsvReaderFormat<T> forSchema(
+ CsvMapper mapper, CsvSchema schema, TypeInformation<T> typeInformation) {
+ return new CsvReaderFormat<>(
+ () -> mapper,
+ ignored -> schema,
+ typeInformation.getTypeClass(),
+ (value, context) -> value,
+ typeInformation,
+ false);
+ }
+
+ /**
+ * Builds a new {@code CsvReaderFormat} using a {@code CsvSchema} generator and {@code
+ * CsvMapper} factory.
*
- * @param mapper The pre-created {@code CsvMapper}.
- * @param schema The Jackson CSV schema configured for parsing specific CSV files.
+ * @param mapperFactory The factory creating the {@code CsvMapper}.
+ * @param schemaGenerator A generator that creates and configures the Jackson CSV schema for
+ * parsins specific CSV files, from a mapper created by the mapper factory.
* @param typeInformation The Flink type descriptor of the returned elements.
* @param <T> The type of the returned elements.
*/
public static <T> CsvReaderFormat<T> forSchema(
- CsvMapper mapper, CsvSchema schema, TypeInformation<T> typeInformation) {
+ SerializableSupplier<CsvMapper> mapperFactory,
+ SerializableFunction<CsvMapper, CsvSchema> schemaGenerator,
+ TypeInformation<T> typeInformation) {
return new CsvReaderFormat<>(
- mapper,
- schema,
+ mapperFactory,
+ schemaGenerator,
typeInformation.getTypeClass(),
(value, context) -> value,
typeInformation,
@@ -136,10 +162,9 @@ public class CsvReaderFormat<T> extends SimpleStreamFormat<T> {
* @param <T> The type of the returned elements.
*/
public static <T> CsvReaderFormat<T> forPojo(Class<T> pojoType) {
- CsvMapper mapper = new CsvMapper();
return forSchema(
- mapper,
- mapper.schemaFor(pojoType).withoutQuoteChar(),
+ () -> new CsvMapper(),
+ mapper -> mapper.schemaFor(pojoType).withoutQuoteChar(),
TypeInformation.of(pojoType));
}
@@ -149,8 +174,8 @@ public class CsvReaderFormat<T> extends SimpleStreamFormat<T> {
*/
public CsvReaderFormat<T> withIgnoreParseErrors() {
return new CsvReaderFormat<T>(
- this.mapper,
- this.schema,
+ this.mapperFactory,
+ this.schemaGenerator,
this.rootType,
this.converter,
this.typeInformation,
@@ -160,8 +185,12 @@ public class CsvReaderFormat<T> extends SimpleStreamFormat<T> {
@Override
public StreamFormat.Reader<T> createReader(Configuration config, FSDataInputStream stream)
throws IOException {
+ final CsvMapper csvMapper = mapperFactory.get();
return new Reader<>(
- mapper.readerFor(rootType).with(schema).readValues(stream),
+ csvMapper
+ .readerFor(rootType)
+ .with(schemaGenerator.apply(csvMapper))
+ .readValues(stream),
converter,
ignoreParseErrors);
}
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvReaderFormatTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvReaderFormatTest.java
new file mode 100644
index 00000000000..f6b04748f98
--- /dev/null
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvReaderFormatTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.io.InputStreamFSInputWrapper;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class CsvReaderFormatTest {
+
+ @Test
+ void testForPojoSerializability() throws IOException, ClassNotFoundException {
+ final CsvReaderFormat<Pojo> format = CsvReaderFormat.forPojo(Pojo.class);
+
+ final byte[] bytes = InstantiationUtil.serializeObject(format);
+ InstantiationUtil.deserializeObject(bytes, CsvReaderFormatTest.class.getClassLoader());
+ }
+
+ @Test
+ void testForSchemaSerializability() throws IOException, ClassNotFoundException {
+ final CsvSchema schema = new CsvMapper().schemaFor(Pojo.class);
+ final CsvReaderFormat<Pojo> format =
+ CsvReaderFormat.forSchema(schema, TypeInformation.of(Pojo.class));
+
+ final byte[] bytes = InstantiationUtil.serializeObject(format);
+ InstantiationUtil.deserializeObject(bytes, CsvReaderFormatTest.class.getClassLoader());
+ }
+
+ @Test
+ void testForSchemaWithMapperSerializability() throws IOException, ClassNotFoundException {
+ final CsvReaderFormat<Pojo> format =
+ CsvReaderFormat.forSchema(
+ () -> new CsvMapper(),
+ mapper -> mapper.schemaFor(Pojo.class),
+ TypeInformation.of(Pojo.class));
+
+ final byte[] bytes = InstantiationUtil.serializeObject(format);
+ InstantiationUtil.deserializeObject(bytes, CsvReaderFormatTest.class.getClassLoader());
+ }
+
+ /**
+ * Verifies that we don't use eagerly use the mapper factory in the constructor to initialize
+ * some non-transient field.
+ */
+ @Test
+ void testForSchemaWithMapperSerializabilityWithUnserializableMapper()
+ throws IOException, ClassNotFoundException {
+ final CsvReaderFormat<Pojo> format =
+ CsvReaderFormat.forSchema(
+ () -> {
+ final CsvMapper csvMapper = new CsvMapper();
+ // this module is not serializable
+ csvMapper.registerModule(new JavaTimeModule());
+ return csvMapper;
+ },
+ mapper -> mapper.schemaFor(Pojo.class),
+ TypeInformation.of(Pojo.class));
+
+ final byte[] bytes = InstantiationUtil.serializeObject(format);
+ InstantiationUtil.deserializeObject(bytes, CsvReaderFormatTest.class.getClassLoader());
+ }
+
+ @Test
+ void testCreatedMapperPassedToSchemaFunction() throws IOException, ClassNotFoundException {
+ final CsvMapper csvMapper = new CsvMapper();
+
+ AtomicReference<CsvMapper> passedMapper = new AtomicReference<>();
+
+ final CsvReaderFormat<Pojo> format =
+ CsvReaderFormat.forSchema(
+ () -> csvMapper,
+ mapper -> {
+ passedMapper.set(csvMapper);
+ return mapper.schemaFor(Pojo.class);
+ },
+ TypeInformation.of(Pojo.class));
+
+ format.createReader(
+ new Configuration(),
+ new InputStreamFSInputWrapper(new ByteArrayInputStream(new byte[0])));
+ assertThat(passedMapper.get()).isSameAs(csvMapper);
+ }
+
+ public static class Pojo {
+ public int x;
+ }
+}
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
index 97fd78459fc..ea073432116 100644
--- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
@@ -161,12 +161,14 @@ public class DataStreamCsvITCase {
public void testCsvReaderFormatFromSchema() throws Exception {
writeFile(outDir, "data.csv", CSV_LINES_PIPE_SEPARATED);
- CsvMapper mapper = new CsvMapper();
- CsvSchema schema =
- mapper.schemaFor(CityPojo.class).withoutQuoteChar().withColumnSeparator('|');
-
final CsvReaderFormat<CityPojo> csvFormat =
- CsvReaderFormat.forSchema(mapper, schema, TypeInformation.of(CityPojo.class));
+ CsvReaderFormat.forSchema(
+ () -> new CsvMapper(),
+ mapper ->
+ mapper.schemaFor(CityPojo.class)
+ .withoutQuoteChar()
+ .withColumnSeparator('|'),
+ TypeInformation.of(CityPojo.class));
final List<CityPojo> result = initializeSourceAndReadData(outDir, csvFormat);
assertThat(Arrays.asList(POJOS)).isEqualTo(result);
diff --git a/flink-python/pom.xml b/flink-python/pom.xml
index 94133e2288b..fa5fe677285 100644
--- a/flink-python/pom.xml
+++ b/flink-python/pom.xml
@@ -77,6 +77,12 @@ under the License.
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-csv</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
<!-- Beam dependencies -->
diff --git a/flink-python/pyflink/datastream/formats/csv.py b/flink-python/pyflink/datastream/formats/csv.py
index 82f405da191..2dc00f59d2d 100644
--- a/flink-python/pyflink/datastream/formats/csv.py
+++ b/flink-python/pyflink/datastream/formats/csv.py
@@ -17,11 +17,9 @@
################################################################################
from typing import Optional, cast
-from py4j.java_gateway import get_java_class
from pyflink.datastream.connectors import StreamFormat
from pyflink.java_gateway import get_gateway
from pyflink.table.types import DataType, DataTypes, _to_java_data_type, RowType, NumericType
-from pyflink.util.java_utils import to_jarray
class CsvSchema(object):
@@ -295,29 +293,9 @@ class CsvReaderFormat(StreamFormat):
Builds a :class:`CsvReaderFormat` using `CsvSchema`.
"""
jvm = get_gateway().jvm
- jackson = jvm.org.apache.flink.shaded.jackson2.com.fasterxml.jackson
- constructor = get_java_class(jvm.org.apache.flink.formats.csv.CsvReaderFormat) \
- .getDeclaredConstructor(
- to_jarray(jvm.Class, [
- get_java_class(jackson.dataformat.csv.CsvMapper),
- get_java_class(jackson.dataformat.csv.CsvSchema),
- get_java_class(jvm.Class),
- get_java_class(jvm.org.apache.flink.formats.common.Converter),
- get_java_class(jvm.org.apache.flink.api.common.typeinfo.TypeInformation),
- get_java_class(jvm.boolean)
- ])
- )
- constructor.setAccessible(True)
- j_csv_format = constructor.newInstance(
- to_jarray(jvm.Object, [
- jackson.dataformat.csv.CsvMapper(),
+ j_csv_format = jvm.org.apache.flink.formats.csv.CsvReaderFormatFactory \
+ .createCsvReaderFormat(
schema._j_schema,
- get_java_class(jackson.databind.JsonNode),
- jvm.org.apache.flink.formats.csv.CsvToRowDataConverters(False).createRowConverter(
- _to_java_data_type(schema._data_type).getLogicalType(), True),
- jvm.org.apache.flink.table.runtime.typeutils.InternalTypeInfo.of(
- _to_java_data_type(schema._data_type).getLogicalType()),
- False
- ])
- )
+ _to_java_data_type(schema._data_type)
+ )
return CsvReaderFormat(j_csv_format)
diff --git a/flink-python/src/main/java/org/apache/flink/formats/csv/CsvReaderFormatFactory.java b/flink-python/src/main/java/org/apache/flink/formats/csv/CsvReaderFormatFactory.java
new file mode 100644
index 00000000000..f5c845a6190
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/formats/csv/CsvReaderFormatFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+/** Util for creating a {@link CsvReaderFormat}. */
+public class CsvReaderFormatFactory {
+ public static CsvReaderFormat createCsvReaderFormat(CsvSchema schema, DataType dataType) {
+ Preconditions.checkArgument(dataType.getLogicalType() instanceof RowType);
+
+ return new CsvReaderFormat(
+ () -> new CsvMapper(),
+ ignored -> schema,
+ JsonNode.class,
+ new CsvToRowDataConverters(false)
+ .createRowConverter(
+ LogicalTypeUtils.toRowType(dataType.getLogicalType()), true),
+ InternalTypeInfo.of(dataType.getLogicalType()),
+ false);
+ }
+}