You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/07/26 10:51:23 UTC

[flink] branch master updated: [FLINK-28624][csv] Accept mapper/schema factories

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1dee35a9966 [FLINK-28624][csv] Accept mapper/schema factories
1dee35a9966 is described below

commit 1dee35a9966065593aa5dfc618f5fec17fb80890
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Jul 26 12:51:16 2022 +0200

    [FLINK-28624][csv] Accept mapper/schema factories
---
 .../docs/connectors/datastream/filesystem.md       |   4 +-
 .../docs/connectors/datastream/formats/csv.md      |  12 +--
 .../docs/connectors/datastream/filesystem.md       |   4 +-
 .../docs/connectors/datastream/formats/csv.md      |  12 +--
 .../flink/formats/csv/CsvFileFormatFactory.java    |   4 +-
 .../apache/flink/formats/csv/CsvReaderFormat.java  |  75 +++++++++-----
 .../flink/formats/csv/CsvReaderFormatTest.java     | 115 +++++++++++++++++++++
 .../flink/formats/csv/DataStreamCsvITCase.java     |  12 ++-
 flink-python/pom.xml                               |   6 ++
 flink-python/pyflink/datastream/formats/csv.py     |  30 +-----
 .../flink/formats/csv/CsvReaderFormatFactory.java  |  45 ++++++++
 11 files changed, 245 insertions(+), 74 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/filesystem.md b/docs/content.zh/docs/connectors/datastream/filesystem.md
index 3c4b0cea567..cf8e6e1dae2 100644
--- a/docs/content.zh/docs/connectors/datastream/filesystem.md
+++ b/docs/content.zh/docs/connectors/datastream/filesystem.md
@@ -161,8 +161,8 @@ FileSource<SomePojo> source =
 如果需要对 CSV 模式或解析选项进行更细粒度的控制,可以使用 `CsvReaderFormat` 的更底层的 `forSchema` 静态工厂方法:
 
 ```java
-CsvReaderFormat<T> forSchema(CsvMapper mapper, 
-                             CsvSchema schema, 
+CsvReaderFormat<T> forSchema(Supplier<CsvMapper> mapperFactory, 
+                             Function<CsvMapper, CsvSchema> schemaGenerator, 
                              TypeInformation<T> typeInformation) 
 ```
 
diff --git a/docs/content.zh/docs/connectors/datastream/formats/csv.md b/docs/content.zh/docs/connectors/datastream/formats/csv.md
index 870dde3488d..dbf8bd6be6c 100644
--- a/docs/content.zh/docs/connectors/datastream/formats/csv.md
+++ b/docs/content.zh/docs/connectors/datastream/formats/csv.md
@@ -60,8 +60,8 @@ Note: you might need to add `@JsonPropertyOrder({field1, field2, ...})` annotati
 If you need more fine-grained control over the CSV schema or the parsing options, use the more low-level `forSchema` static factory method of `CsvReaderFormat`:
 
 ```java
-CsvReaderFormat<T> forSchema(CsvMapper mapper, 
-                             CsvSchema schema, 
+CsvReaderFormat<T> forSchema(Supplier<CsvMapper> mapperFactory, 
+                             Function<CsvMapper, CsvSchema> schemaGenerator, 
                              TypeInformation<T> typeInformation) 
 ```
 Below is an example of reading a POJO with a custom columns' separator:
@@ -80,15 +80,14 @@ Below is an example of reading a POJO with a custom columns' separator:
     public long population;
 }
 
-CsvMapper mapper = new CsvMapper();
-CsvSchema schema =
+Function<CsvMapper, CsvSchema> schemaGenerator = mapper ->
         mapper.schemaFor(CityPojo.class).withoutQuoteChar().withColumnSeparator('|');
 
 CsvReaderFormat<CityPojo> csvFormat =
-        CsvReaderFormat.forSchema(mapper, schema, TypeInformation.of(CityPojo.class));
+        CsvReaderFormat.forSchema(() -> new CsvMapper(), schemaGenerator, TypeInformation.of(CityPojo.class));
 
 FileSource<CityPojo> source =
-        FileSource.forRecordStreamFormat(csvFormat,Path.fromLocalFile(...)).build();
+        FileSource.forRecordStreamFormat(csvFormat, Path.fromLocalFile(...)).build();
 ```
 The corresponding CSV file:
 ```
@@ -105,7 +104,6 @@ public static class ComplexPojo {
 
 CsvReaderFormat<ComplexPojo> csvFormat =
         CsvReaderFormat.forSchema(
-                new CsvMapper(),
                 CsvSchema.builder()
                         .addColumn(
                                 new CsvSchema.Column(0, "id", CsvSchema.ColumnType.NUMBER))
diff --git a/docs/content/docs/connectors/datastream/filesystem.md b/docs/content/docs/connectors/datastream/filesystem.md
index f9f0f7ae05b..ce2a6c50824 100644
--- a/docs/content/docs/connectors/datastream/filesystem.md
+++ b/docs/content/docs/connectors/datastream/filesystem.md
@@ -157,8 +157,8 @@ The schema for CSV parsing, in this case, is automatically derived based on the
 If you need more fine-grained control over the CSV schema or the parsing options, use the more low-level `forSchema` static factory method of `CsvReaderFormat`:
 
 ```java
-CsvReaderFormat<T> forSchema(CsvMapper mapper, 
-                             CsvSchema schema, 
+CsvReaderFormat<T> forSchema(Supplier<CsvMapper> mapperFactory, 
+                             Function<CsvMapper, CsvSchema> schemaGenerator, 
                              TypeInformation<T> typeInformation) 
 ```
 
diff --git a/docs/content/docs/connectors/datastream/formats/csv.md b/docs/content/docs/connectors/datastream/formats/csv.md
index 870dde3488d..dbf8bd6be6c 100644
--- a/docs/content/docs/connectors/datastream/formats/csv.md
+++ b/docs/content/docs/connectors/datastream/formats/csv.md
@@ -60,8 +60,8 @@ Note: you might need to add `@JsonPropertyOrder({field1, field2, ...})` annotati
 If you need more fine-grained control over the CSV schema or the parsing options, use the more low-level `forSchema` static factory method of `CsvReaderFormat`:
 
 ```java
-CsvReaderFormat<T> forSchema(CsvMapper mapper, 
-                             CsvSchema schema, 
+CsvReaderFormat<T> forSchema(Supplier<CsvMapper> mapperFactory, 
+                             Function<CsvMapper, CsvSchema> schemaGenerator, 
                              TypeInformation<T> typeInformation) 
 ```
 Below is an example of reading a POJO with a custom columns' separator:
@@ -80,15 +80,14 @@ Below is an example of reading a POJO with a custom columns' separator:
     public long population;
 }
 
-CsvMapper mapper = new CsvMapper();
-CsvSchema schema =
+Function<CsvMapper, CsvSchema> schemaGenerator = mapper ->
         mapper.schemaFor(CityPojo.class).withoutQuoteChar().withColumnSeparator('|');
 
 CsvReaderFormat<CityPojo> csvFormat =
-        CsvReaderFormat.forSchema(mapper, schema, TypeInformation.of(CityPojo.class));
+        CsvReaderFormat.forSchema(() -> new CsvMapper(), schemaGenerator, TypeInformation.of(CityPojo.class));
 
 FileSource<CityPojo> source =
-        FileSource.forRecordStreamFormat(csvFormat,Path.fromLocalFile(...)).build();
+        FileSource.forRecordStreamFormat(csvFormat, Path.fromLocalFile(...)).build();
 ```
 The corresponding CSV file:
 ```
@@ -105,7 +104,6 @@ public static class ComplexPojo {
 
 CsvReaderFormat<ComplexPojo> csvFormat =
         CsvReaderFormat.forSchema(
-                new CsvMapper(),
                 CsvSchema.builder()
                         .addColumn(
                                 new CsvSchema.Column(0, "id", CsvSchema.ColumnType.NUMBER))
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
index e59ec63794f..2054c182a01 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
@@ -138,8 +138,8 @@ public class CsvFileFormatFactory implements BulkReaderFormatFactory, BulkWriter
                                     .createRowConverter(projectedRowType, true);
             CsvReaderFormat<RowData> csvReaderFormat =
                     new CsvReaderFormat<>(
-                            new CsvMapper(),
-                            schema,
+                            () -> new CsvMapper(),
+                            ignored -> schema,
                             JsonNode.class,
                             converter,
                             context.createTypeInformation(projectedDataType),
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java
index 1ce00fd4d70..c2d14008cb9 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java
@@ -25,6 +25,8 @@ import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
 import org.apache.flink.connector.file.src.reader.StreamFormat;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.formats.common.Converter;
+import org.apache.flink.util.function.SerializableFunction;
+import org.apache.flink.util.function.SerializableSupplier;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
@@ -33,6 +35,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.Csv
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.function.Function;
+import java.util.function.Supplier;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -56,11 +60,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * low-level {@code forSchema} static factory method based on the {@code Jackson} library utilities:
  *
  * <pre>{@code
- * CsvMapper mapper = new CsvMapper();
- * CsvSchema schema = mapper.schemaFor(SomePojo.class)
+ * Function<CsvMapper, CsvSchema> schemaGenerator =
+ *          mapper -> mapper.schemaFor(SomePojo.class)
  *                          .withColumnSeparator('|');
  * CsvReaderFormat<SomePojo> csvFormat =
- *          CsvReaderFormat.forSchema(mapper,schema, TypeInformation.of(SomePojo.class));
+ *          CsvReaderFormat.forSchema(() -> new CsvMapper(), schemaGenerator, TypeInformation.of(SomePojo.class));
  * FileSource<SomePojo> source =
  *         FileSource.forRecordStreamFormat(csvFormat, Path.fromLocalFile(filesPath)).build();
  * }</pre>
@@ -72,8 +76,8 @@ public class CsvReaderFormat<T> extends SimpleStreamFormat<T> {
 
     private static final long serialVersionUID = 1L;
 
-    private final CsvMapper mapper;
-    private final CsvSchema schema;
+    private final SerializableSupplier<CsvMapper> mapperFactory;
+    private final SerializableFunction<CsvMapper, CsvSchema> schemaGenerator;
     private final Class<Object> rootType;
     private final Converter<Object, T, Void> converter;
     private final TypeInformation<T> typeInformation;
@@ -81,14 +85,14 @@ public class CsvReaderFormat<T> extends SimpleStreamFormat<T> {
 
     @SuppressWarnings("unchecked")
     <R> CsvReaderFormat(
-            CsvMapper mapper,
-            CsvSchema schema,
+            SerializableSupplier<CsvMapper> mapperFactory,
+            SerializableFunction<CsvMapper, CsvSchema> schemaGenerator,
             Class<R> rootType,
             Converter<R, T, Void> converter,
             TypeInformation<T> typeInformation,
             boolean ignoreParseErrors) {
-        this.mapper = checkNotNull(mapper);
-        this.schema = checkNotNull(schema);
+        this.mapperFactory = checkNotNull(mapperFactory);
+        this.schemaGenerator = checkNotNull(schemaGenerator);
         this.rootType = (Class<Object>) checkNotNull(rootType);
         this.typeInformation = checkNotNull(typeInformation);
         this.converter = (Converter<Object, T, Void>) checkNotNull(converter);
@@ -104,23 +108,45 @@ public class CsvReaderFormat<T> extends SimpleStreamFormat<T> {
      */
     public static <T> CsvReaderFormat<T> forSchema(
             CsvSchema schema, TypeInformation<T> typeInformation) {
-        return forSchema(new CsvMapper(), schema, typeInformation);
+        return forSchema(() -> new CsvMapper(), ignored -> schema, typeInformation);
     }
 
     /**
-     * Builds a new {@code CsvReaderFormat} using a {@code CsvSchema} and a pre-created {@code
-     * CsvMapper}.
+     * @deprecated This method is limited to serializable {@link CsvMapper CsvMappers}, preventing
+     *     the usage of certain Jackson modules (like the {@link
+     *     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
+     *     Java 8 Date/Time Serializers}). Use {@link #forSchema(Supplier, Function,
+     *     TypeInformation)} instead.
+     */
+    @Deprecated
+    public static <T> CsvReaderFormat<T> forSchema(
+            CsvMapper mapper, CsvSchema schema, TypeInformation<T> typeInformation) {
+        return new CsvReaderFormat<>(
+                () -> mapper,
+                ignored -> schema,
+                typeInformation.getTypeClass(),
+                (value, context) -> value,
+                typeInformation,
+                false);
+    }
+
+    /**
+     * Builds a new {@code CsvReaderFormat} using a {@code CsvSchema} generator and {@code
+     * CsvMapper} factory.
      *
-     * @param mapper The pre-created {@code CsvMapper}.
-     * @param schema The Jackson CSV schema configured for parsing specific CSV files.
+     * @param mapperFactory The factory creating the {@code CsvMapper}.
+     * @param schemaGenerator A generator that creates and configures the Jackson CSV schema for
+     *     parsins specific CSV files, from a mapper created by the mapper factory.
      * @param typeInformation The Flink type descriptor of the returned elements.
      * @param <T> The type of the returned elements.
      */
     public static <T> CsvReaderFormat<T> forSchema(
-            CsvMapper mapper, CsvSchema schema, TypeInformation<T> typeInformation) {
+            SerializableSupplier<CsvMapper> mapperFactory,
+            SerializableFunction<CsvMapper, CsvSchema> schemaGenerator,
+            TypeInformation<T> typeInformation) {
         return new CsvReaderFormat<>(
-                mapper,
-                schema,
+                mapperFactory,
+                schemaGenerator,
                 typeInformation.getTypeClass(),
                 (value, context) -> value,
                 typeInformation,
@@ -136,10 +162,9 @@ public class CsvReaderFormat<T> extends SimpleStreamFormat<T> {
      * @param <T> The type of the returned elements.
      */
     public static <T> CsvReaderFormat<T> forPojo(Class<T> pojoType) {
-        CsvMapper mapper = new CsvMapper();
         return forSchema(
-                mapper,
-                mapper.schemaFor(pojoType).withoutQuoteChar(),
+                () -> new CsvMapper(),
+                mapper -> mapper.schemaFor(pojoType).withoutQuoteChar(),
                 TypeInformation.of(pojoType));
     }
 
@@ -149,8 +174,8 @@ public class CsvReaderFormat<T> extends SimpleStreamFormat<T> {
      */
     public CsvReaderFormat<T> withIgnoreParseErrors() {
         return new CsvReaderFormat<T>(
-                this.mapper,
-                this.schema,
+                this.mapperFactory,
+                this.schemaGenerator,
                 this.rootType,
                 this.converter,
                 this.typeInformation,
@@ -160,8 +185,12 @@ public class CsvReaderFormat<T> extends SimpleStreamFormat<T> {
     @Override
     public StreamFormat.Reader<T> createReader(Configuration config, FSDataInputStream stream)
             throws IOException {
+        final CsvMapper csvMapper = mapperFactory.get();
         return new Reader<>(
-                mapper.readerFor(rootType).with(schema).readValues(stream),
+                csvMapper
+                        .readerFor(rootType)
+                        .with(schemaGenerator.apply(csvMapper))
+                        .readValues(stream),
                 converter,
                 ignoreParseErrors);
     }
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvReaderFormatTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvReaderFormatTest.java
new file mode 100644
index 00000000000..f6b04748f98
--- /dev/null
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvReaderFormatTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.io.InputStreamFSInputWrapper;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class CsvReaderFormatTest {
+
+    @Test
+    void testForPojoSerializability() throws IOException, ClassNotFoundException {
+        final CsvReaderFormat<Pojo> format = CsvReaderFormat.forPojo(Pojo.class);
+
+        final byte[] bytes = InstantiationUtil.serializeObject(format);
+        InstantiationUtil.deserializeObject(bytes, CsvReaderFormatTest.class.getClassLoader());
+    }
+
+    @Test
+    void testForSchemaSerializability() throws IOException, ClassNotFoundException {
+        final CsvSchema schema = new CsvMapper().schemaFor(Pojo.class);
+        final CsvReaderFormat<Pojo> format =
+                CsvReaderFormat.forSchema(schema, TypeInformation.of(Pojo.class));
+
+        final byte[] bytes = InstantiationUtil.serializeObject(format);
+        InstantiationUtil.deserializeObject(bytes, CsvReaderFormatTest.class.getClassLoader());
+    }
+
+    @Test
+    void testForSchemaWithMapperSerializability() throws IOException, ClassNotFoundException {
+        final CsvReaderFormat<Pojo> format =
+                CsvReaderFormat.forSchema(
+                        () -> new CsvMapper(),
+                        mapper -> mapper.schemaFor(Pojo.class),
+                        TypeInformation.of(Pojo.class));
+
+        final byte[] bytes = InstantiationUtil.serializeObject(format);
+        InstantiationUtil.deserializeObject(bytes, CsvReaderFormatTest.class.getClassLoader());
+    }
+
+    /**
+     * Verifies that we don't use eagerly use the mapper factory in the constructor to initialize
+     * some non-transient field.
+     */
+    @Test
+    void testForSchemaWithMapperSerializabilityWithUnserializableMapper()
+            throws IOException, ClassNotFoundException {
+        final CsvReaderFormat<Pojo> format =
+                CsvReaderFormat.forSchema(
+                        () -> {
+                            final CsvMapper csvMapper = new CsvMapper();
+                            // this module is not serializable
+                            csvMapper.registerModule(new JavaTimeModule());
+                            return csvMapper;
+                        },
+                        mapper -> mapper.schemaFor(Pojo.class),
+                        TypeInformation.of(Pojo.class));
+
+        final byte[] bytes = InstantiationUtil.serializeObject(format);
+        InstantiationUtil.deserializeObject(bytes, CsvReaderFormatTest.class.getClassLoader());
+    }
+
+    @Test
+    void testCreatedMapperPassedToSchemaFunction() throws IOException, ClassNotFoundException {
+        final CsvMapper csvMapper = new CsvMapper();
+
+        AtomicReference<CsvMapper> passedMapper = new AtomicReference<>();
+
+        final CsvReaderFormat<Pojo> format =
+                CsvReaderFormat.forSchema(
+                        () -> csvMapper,
+                        mapper -> {
+                            passedMapper.set(csvMapper);
+                            return mapper.schemaFor(Pojo.class);
+                        },
+                        TypeInformation.of(Pojo.class));
+
+        format.createReader(
+                new Configuration(),
+                new InputStreamFSInputWrapper(new ByteArrayInputStream(new byte[0])));
+        assertThat(passedMapper.get()).isSameAs(csvMapper);
+    }
+
+    public static class Pojo {
+        public int x;
+    }
+}
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
index 97fd78459fc..ea073432116 100644
--- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
@@ -161,12 +161,14 @@ public class DataStreamCsvITCase {
     public void testCsvReaderFormatFromSchema() throws Exception {
         writeFile(outDir, "data.csv", CSV_LINES_PIPE_SEPARATED);
 
-        CsvMapper mapper = new CsvMapper();
-        CsvSchema schema =
-                mapper.schemaFor(CityPojo.class).withoutQuoteChar().withColumnSeparator('|');
-
         final CsvReaderFormat<CityPojo> csvFormat =
-                CsvReaderFormat.forSchema(mapper, schema, TypeInformation.of(CityPojo.class));
+                CsvReaderFormat.forSchema(
+                        () -> new CsvMapper(),
+                        mapper ->
+                                mapper.schemaFor(CityPojo.class)
+                                        .withoutQuoteChar()
+                                        .withColumnSeparator('|'),
+                        TypeInformation.of(CityPojo.class));
         final List<CityPojo> result = initializeSourceAndReadData(outDir, csvFormat);
 
         assertThat(Arrays.asList(POJOS)).isEqualTo(result);
diff --git a/flink-python/pom.xml b/flink-python/pom.xml
index 94133e2288b..fa5fe677285 100644
--- a/flink-python/pom.xml
+++ b/flink-python/pom.xml
@@ -77,6 +77,12 @@ under the License.
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-csv</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
 
 		<!-- Beam dependencies -->
 
diff --git a/flink-python/pyflink/datastream/formats/csv.py b/flink-python/pyflink/datastream/formats/csv.py
index 82f405da191..2dc00f59d2d 100644
--- a/flink-python/pyflink/datastream/formats/csv.py
+++ b/flink-python/pyflink/datastream/formats/csv.py
@@ -17,11 +17,9 @@
 ################################################################################
 from typing import Optional, cast
 
-from py4j.java_gateway import get_java_class
 from pyflink.datastream.connectors import StreamFormat
 from pyflink.java_gateway import get_gateway
 from pyflink.table.types import DataType, DataTypes, _to_java_data_type, RowType, NumericType
-from pyflink.util.java_utils import to_jarray
 
 
 class CsvSchema(object):
@@ -295,29 +293,9 @@ class CsvReaderFormat(StreamFormat):
         Builds a :class:`CsvReaderFormat` using `CsvSchema`.
         """
         jvm = get_gateway().jvm
-        jackson = jvm.org.apache.flink.shaded.jackson2.com.fasterxml.jackson
-        constructor = get_java_class(jvm.org.apache.flink.formats.csv.CsvReaderFormat) \
-            .getDeclaredConstructor(
-            to_jarray(jvm.Class, [
-                get_java_class(jackson.dataformat.csv.CsvMapper),
-                get_java_class(jackson.dataformat.csv.CsvSchema),
-                get_java_class(jvm.Class),
-                get_java_class(jvm.org.apache.flink.formats.common.Converter),
-                get_java_class(jvm.org.apache.flink.api.common.typeinfo.TypeInformation),
-                get_java_class(jvm.boolean)
-            ])
-        )
-        constructor.setAccessible(True)
-        j_csv_format = constructor.newInstance(
-            to_jarray(jvm.Object, [
-                jackson.dataformat.csv.CsvMapper(),
+        j_csv_format = jvm.org.apache.flink.formats.csv.CsvReaderFormatFactory \
+            .createCsvReaderFormat(
                 schema._j_schema,
-                get_java_class(jackson.databind.JsonNode),
-                jvm.org.apache.flink.formats.csv.CsvToRowDataConverters(False).createRowConverter(
-                    _to_java_data_type(schema._data_type).getLogicalType(), True),
-                jvm.org.apache.flink.table.runtime.typeutils.InternalTypeInfo.of(
-                    _to_java_data_type(schema._data_type).getLogicalType()),
-                False
-            ])
-        )
+                _to_java_data_type(schema._data_type)
+            )
         return CsvReaderFormat(j_csv_format)
diff --git a/flink-python/src/main/java/org/apache/flink/formats/csv/CsvReaderFormatFactory.java b/flink-python/src/main/java/org/apache/flink/formats/csv/CsvReaderFormatFactory.java
new file mode 100644
index 00000000000..f5c845a6190
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/formats/csv/CsvReaderFormatFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+/** Util for creating a {@link CsvReaderFormat}. */
+public class CsvReaderFormatFactory {
+    public static CsvReaderFormat createCsvReaderFormat(CsvSchema schema, DataType dataType) {
+        Preconditions.checkArgument(dataType.getLogicalType() instanceof RowType);
+
+        return new CsvReaderFormat(
+                () -> new CsvMapper(),
+                ignored -> schema,
+                JsonNode.class,
+                new CsvToRowDataConverters(false)
+                        .createRowConverter(
+                                LogicalTypeUtils.toRowType(dataType.getLogicalType()), true),
+                InternalTypeInfo.of(dataType.getLogicalType()),
+                false);
+    }
+}